monitor_db_unittest.py revision d1ee1dd3f3e5ac44f00d7a96deb815dbe1beedad
1#!/usr/bin/python 2 3import unittest, time, subprocess, os, StringIO, tempfile, datetime, shutil 4import MySQLdb 5import common 6from autotest_lib.frontend import setup_django_environment 7from autotest_lib.frontend import setup_test_environment 8from autotest_lib.client.common_lib import global_config, host_protections 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.database import database_connection, migrate 11from autotest_lib.frontend import thread_local 12from autotest_lib.frontend.afe import models 13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager 14from autotest_lib.scheduler import scheduler_config 15 16_DEBUG = False 17 18class DummyAgent(object): 19 _is_running = False 20 _is_done = False 21 num_processes = 1 22 host_ids = [] 23 queue_entry_ids = [] 24 25 def is_running(self): 26 return self._is_running 27 28 29 def tick(self): 30 self._is_running = True 31 32 33 def is_done(self): 34 return self._is_done 35 36 37 def set_done(self, done): 38 self._is_done = done 39 self._is_running = not done 40 41 42class IsRow(mock.argument_comparator): 43 def __init__(self, row_id): 44 self.row_id = row_id 45 46 47 def is_satisfied_by(self, parameter): 48 return list(parameter)[0] == self.row_id 49 50 51 def __str__(self): 52 return 'row with id %s' % self.row_id 53 54 55class BaseSchedulerTest(unittest.TestCase): 56 _config_section = 'AUTOTEST_WEB' 57 _test_db_initialized = False 58 59 def _do_query(self, sql): 60 self._database.execute(sql) 61 62 63 @classmethod 64 def _initialize_test_db(cls): 65 if cls._test_db_initialized: 66 return 67 temp_fd, cls._test_db_file = tempfile.mkstemp(suffix='.monitor_test') 68 os.close(temp_fd) 69 setup_test_environment.set_test_database(cls._test_db_file) 70 setup_test_environment.run_syncdb() 71 cls._test_db_backup = setup_test_environment.backup_test_database() 72 cls._test_db_initialized = True 73 74 75 def _open_test_db(self): 76 self._initialize_test_db() 77 setup_test_environment.restore_test_database(self._test_db_backup) 78 self._database = ( 79 database_connection.DatabaseConnection.get_test_database( 80 self._test_db_file)) 81 self._database.connect() 82 self._database.debug = _DEBUG 83 84 85 def _close_test_db(self): 86 self._database.disconnect() 87 88 89 def _set_monitor_stubs(self): 90 monitor_db._db = self._database 91 monitor_db._drone_manager._results_dir = '/test/path' 92 monitor_db._drone_manager._temporary_directory = '/test/path/tmp' 93 94 95 def _fill_in_test_data(self): 96 user = models.User.objects.create(login='my_user') 97 acl_group = models.AclGroup.objects.create(name='my_acl') 98 acl_group.users.add(user) 99 100 hosts = [models.Host.objects.create(hostname=hostname) for hostname in 101 ('host1', 'host2', 'host3', 'host4')] 102 acl_group.hosts = hosts 103 models.AclGroup.smart_get('Everyone').hosts = [] 104 105 labels = [models.Label.objects.create(name=name) for name in 106 ('label1', 'label2', 'label3')] 107 labels[2].only_if_needed = True 108 labels[2].save() 109 hosts[0].labels.add(labels[0]) 110 hosts[1].labels.add(labels[1]) 111 112 113 def _setup_dummy_user(self): 114 user = models.User.objects.create(login='dummy', access_level=100) 115 thread_local.set_user(user) 116 117 118 def setUp(self): 119 self.god = mock.mock_god() 120 self._open_test_db() 121 self._fill_in_test_data() 122 self._set_monitor_stubs() 123 self._dispatcher = monitor_db.Dispatcher() 124 self._setup_dummy_user() 125 126 127 def tearDown(self): 128 self._close_test_db() 129 self.god.unstub_all() 130 131 132 def _create_job(self, hosts=[], metahosts=[], priority=0, active=False, 133 synchronous=False): 134 synch_count = synchronous and 2 or 1 135 created_on = datetime.datetime(2008, 1, 1) 136 status = models.HostQueueEntry.Status.QUEUED 137 if active: 138 status = models.HostQueueEntry.Status.RUNNING 139 job = models.Job.objects.create( 140 name='test', owner='my_user', priority=priority, 141 synch_count=synch_count, created_on=created_on, 142 reboot_before=models.RebootBefore.NEVER) 143 for host_id in hosts: 144 models.HostQueueEntry.objects.create(job=job, priority=priority, 145 host_id=host_id, status=status) 146 models.IneligibleHostQueue.objects.create(job=job, host_id=host_id) 147 for label_id in metahosts: 148 models.HostQueueEntry.objects.create(job=job, priority=priority, 149 meta_host_id=label_id, 150 status=status) 151 return job 152 153 154 def _create_job_simple(self, hosts, use_metahost=False, 155 priority=0, active=False): 156 'An alternative interface to _create_job' 157 args = {'hosts' : [], 'metahosts' : []} 158 if use_metahost: 159 args['metahosts'] = hosts 160 else: 161 args['hosts'] = hosts 162 return self._create_job(priority=priority, active=active, **args) 163 164 165 def _update_hqe(self, set, where=''): 166 query = 'UPDATE host_queue_entries SET ' + set 167 if where: 168 query += ' WHERE ' + where 169 self._do_query(query) 170 171 172class DispatcherSchedulingTest(BaseSchedulerTest): 173 _jobs_scheduled = [] 174 175 def _set_monitor_stubs(self): 176 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 177 def run_stub(hqe_self, assigned_host=None): 178 hqe_self.set_status('Starting') 179 if hqe_self.meta_host: 180 host = assigned_host 181 else: 182 host = hqe_self.host 183 self._record_job_scheduled(hqe_self.job.id, host.id) 184 return DummyAgent() 185 monitor_db.HostQueueEntry.run = run_stub 186 187 188 def _record_job_scheduled(self, job_id, host_id): 189 record = (job_id, host_id) 190 self.assert_(record not in self._jobs_scheduled, 191 'Job %d scheduled on host %d twice' % 192 (job_id, host_id)) 193 self._jobs_scheduled.append(record) 194 195 196 def _assert_job_scheduled_on(self, job_id, host_id): 197 record = (job_id, host_id) 198 self.assert_(record in self._jobs_scheduled, 199 'Job %d not scheduled on host %d as expected\n' 200 'Jobs scheduled: %s' % 201 (job_id, host_id, self._jobs_scheduled)) 202 self._jobs_scheduled.remove(record) 203 204 205 def _check_for_extra_schedulings(self): 206 if len(self._jobs_scheduled) != 0: 207 self.fail('Extra jobs scheduled: ' + 208 str(self._jobs_scheduled)) 209 210 211 def _convert_jobs_to_metahosts(self, *job_ids): 212 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 213 self._do_query('UPDATE host_queue_entries SET ' 214 'meta_host=host_id, host_id=NULL ' 215 'WHERE job_id IN ' + sql_tuple) 216 217 218 def _lock_host(self, host_id): 219 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 220 str(host_id)) 221 222 223 def setUp(self): 224 super(DispatcherSchedulingTest, self).setUp() 225 self._jobs_scheduled = [] 226 227 228 def _test_basic_scheduling_helper(self, use_metahosts): 229 'Basic nonmetahost scheduling' 230 self._create_job_simple([1], use_metahosts) 231 self._create_job_simple([2], use_metahosts) 232 self._dispatcher._schedule_new_jobs() 233 self._assert_job_scheduled_on(1, 1) 234 self._assert_job_scheduled_on(2, 2) 235 self._check_for_extra_schedulings() 236 237 238 def _test_priorities_helper(self, use_metahosts): 239 'Test prioritization ordering' 240 self._create_job_simple([1], use_metahosts) 241 self._create_job_simple([2], use_metahosts) 242 self._create_job_simple([1,2], use_metahosts) 243 self._create_job_simple([1], use_metahosts, priority=1) 244 self._dispatcher._schedule_new_jobs() 245 self._assert_job_scheduled_on(4, 1) # higher priority 246 self._assert_job_scheduled_on(2, 2) # earlier job over later 247 self._check_for_extra_schedulings() 248 249 250 def _test_hosts_ready_helper(self, use_metahosts): 251 """ 252 Only hosts that are status=Ready, unlocked and not invalid get 253 scheduled. 254 """ 255 self._create_job_simple([1], use_metahosts) 256 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 257 self._dispatcher._schedule_new_jobs() 258 self._check_for_extra_schedulings() 259 260 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 261 'WHERE id=1') 262 self._dispatcher._schedule_new_jobs() 263 self._check_for_extra_schedulings() 264 265 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 266 'WHERE id=1') 267 self._dispatcher._schedule_new_jobs() 268 if not use_metahosts: 269 self._assert_job_scheduled_on(1, 1) 270 self._check_for_extra_schedulings() 271 272 273 def _test_hosts_idle_helper(self, use_metahosts): 274 'Only idle hosts get scheduled' 275 self._create_job(hosts=[1], active=True) 276 self._create_job_simple([1], use_metahosts) 277 self._dispatcher._schedule_new_jobs() 278 self._check_for_extra_schedulings() 279 280 281 def _test_obey_ACLs_helper(self, use_metahosts): 282 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 283 self._create_job_simple([1], use_metahosts) 284 self._dispatcher._schedule_new_jobs() 285 self._check_for_extra_schedulings() 286 287 288 def _test_only_if_needed_labels_helper(self, use_metahosts): 289 # apply only_if_needed label3 to host1 290 label3 = models.Label.smart_get('label3') 291 models.Host.smart_get('host1').labels.add(label3) 292 293 job = self._create_job_simple([1], use_metahosts) 294 # if the job doesn't depend on label3, there should be no scheduling 295 self._dispatcher._schedule_new_jobs() 296 self._check_for_extra_schedulings() 297 298 # now make the job depend on label3 299 job.dependency_labels.add(label3) 300 self._dispatcher._schedule_new_jobs() 301 self._assert_job_scheduled_on(1, 1) 302 self._check_for_extra_schedulings() 303 304 if use_metahosts: 305 # should also work if the metahost is the only_if_needed label 306 self._do_query('DELETE FROM jobs_dependency_labels') 307 self._create_job(metahosts=[3]) 308 self._dispatcher._schedule_new_jobs() 309 self._assert_job_scheduled_on(2, 1) 310 self._check_for_extra_schedulings() 311 312 313 def test_basic_scheduling(self): 314 self._test_basic_scheduling_helper(False) 315 316 317 def test_priorities(self): 318 self._test_priorities_helper(False) 319 320 321 def test_hosts_ready(self): 322 self._test_hosts_ready_helper(False) 323 324 325 def test_hosts_idle(self): 326 self._test_hosts_idle_helper(False) 327 328 329 def test_obey_ACLs(self): 330 self._test_obey_ACLs_helper(False) 331 332 333 def test_only_if_needed_labels(self): 334 self._test_only_if_needed_labels_helper(False) 335 336 337 def test_non_metahost_on_invalid_host(self): 338 """ 339 Non-metahost entries can get scheduled on invalid hosts (this is how 340 one-time hosts work). 341 """ 342 self._do_query('UPDATE hosts SET invalid=1') 343 self._test_basic_scheduling_helper(False) 344 345 346 def test_metahost_scheduling(self): 347 """ 348 Basic metahost scheduling 349 """ 350 self._test_basic_scheduling_helper(True) 351 352 353 def test_metahost_priorities(self): 354 self._test_priorities_helper(True) 355 356 357 def test_metahost_hosts_ready(self): 358 self._test_hosts_ready_helper(True) 359 360 361 def test_metahost_hosts_idle(self): 362 self._test_hosts_idle_helper(True) 363 364 365 def test_metahost_obey_ACLs(self): 366 self._test_obey_ACLs_helper(True) 367 368 369 def test_metahost_only_if_needed_labels(self): 370 self._test_only_if_needed_labels_helper(True) 371 372 373 def test_nonmetahost_over_metahost(self): 374 """ 375 Non-metahost entries should take priority over metahost entries 376 for the same host 377 """ 378 self._create_job(metahosts=[1]) 379 self._create_job(hosts=[1]) 380 self._dispatcher._schedule_new_jobs() 381 self._assert_job_scheduled_on(2, 1) 382 self._check_for_extra_schedulings() 383 384 385 def test_metahosts_obey_blocks(self): 386 """ 387 Metahosts can't get scheduled on hosts already scheduled for 388 that job. 389 """ 390 self._create_job(metahosts=[1], hosts=[1]) 391 # make the nonmetahost entry complete, so the metahost can try 392 # to get scheduled 393 self._update_hqe(set='complete = 1', where='host_id=1') 394 self._dispatcher._schedule_new_jobs() 395 self._check_for_extra_schedulings() 396 397 398 def test_only_schedule_queued_entries(self): 399 self._create_job(metahosts=[1]) 400 self._update_hqe(set='active=1, host_id=2') 401 self._dispatcher._schedule_new_jobs() 402 self._check_for_extra_schedulings() 403 404 405 def test_no_ready_hosts(self): 406 self._create_job(hosts=[1]) 407 self._do_query('UPDATE hosts SET status="Repair Failed"') 408 self._dispatcher._schedule_new_jobs() 409 self._check_for_extra_schedulings() 410 411 412class DispatcherThrottlingTest(BaseSchedulerTest): 413 """ 414 Test that the dispatcher throttles: 415 * total number of running processes 416 * number of processes started per cycle 417 """ 418 _MAX_RUNNING = 3 419 _MAX_STARTED = 2 420 421 def setUp(self): 422 super(DispatcherThrottlingTest, self).setUp() 423 scheduler_config.config.max_running_processes = self._MAX_RUNNING 424 scheduler_config.config.max_processes_started_per_cycle = ( 425 self._MAX_STARTED) 426 427 428 def _setup_some_agents(self, num_agents): 429 self._agents = [DummyAgent() for i in xrange(num_agents)] 430 self._dispatcher._agents = list(self._agents) 431 432 433 def _run_a_few_cycles(self): 434 for i in xrange(4): 435 self._dispatcher._handle_agents() 436 437 438 def _assert_agents_started(self, indexes, is_started=True): 439 for i in indexes: 440 self.assert_(self._agents[i].is_running() == is_started, 441 'Agent %d %sstarted' % 442 (i, is_started and 'not ' or '')) 443 444 445 def _assert_agents_not_started(self, indexes): 446 self._assert_agents_started(indexes, False) 447 448 449 def test_throttle_total(self): 450 self._setup_some_agents(4) 451 self._run_a_few_cycles() 452 self._assert_agents_started([0, 1, 2]) 453 self._assert_agents_not_started([3]) 454 455 456 def test_throttle_per_cycle(self): 457 self._setup_some_agents(3) 458 self._dispatcher._handle_agents() 459 self._assert_agents_started([0, 1]) 460 self._assert_agents_not_started([2]) 461 462 463 def test_throttle_with_synchronous(self): 464 self._setup_some_agents(2) 465 self._agents[0].num_processes = 3 466 self._run_a_few_cycles() 467 self._assert_agents_started([0]) 468 self._assert_agents_not_started([1]) 469 470 471 def test_large_agent_starvation(self): 472 """ 473 Ensure large agents don't get starved by lower-priority agents. 474 """ 475 self._setup_some_agents(3) 476 self._agents[1].num_processes = 3 477 self._run_a_few_cycles() 478 self._assert_agents_started([0]) 479 self._assert_agents_not_started([1, 2]) 480 481 self._agents[0].set_done(True) 482 self._run_a_few_cycles() 483 self._assert_agents_started([1]) 484 self._assert_agents_not_started([2]) 485 486 487 def test_zero_process_agent(self): 488 self._setup_some_agents(5) 489 self._agents[4].num_processes = 0 490 self._run_a_few_cycles() 491 self._assert_agents_started([0, 1, 2, 4]) 492 self._assert_agents_not_started([3]) 493 494 495class FindAbortTest(BaseSchedulerTest): 496 """ 497 Test the dispatcher abort functionality. 498 """ 499 def _check_abort_agent(self, agent, entry_id): 500 self.assert_(isinstance(agent, monitor_db.Agent)) 501 tasks = list(agent.queue.queue) 502 self.assertEquals(len(tasks), 1) 503 abort = tasks[0] 504 505 self.assert_(isinstance(abort, monitor_db.AbortTask)) 506 self.assertEquals(abort.queue_entry.id, entry_id) 507 508 509 def _check_host_agent(self, agent, host_id): 510 self.assert_(isinstance(agent, monitor_db.Agent)) 511 tasks = list(agent.queue.queue) 512 self.assertEquals(len(tasks), 2) 513 cleanup, verify = tasks 514 515 self.assert_(isinstance(cleanup, monitor_db.CleanupTask)) 516 self.assertEquals(cleanup.host.id, host_id) 517 518 self.assert_(isinstance(verify, monitor_db.VerifyTask)) 519 self.assertEquals(verify.host.id, host_id) 520 521 522 def _check_agents(self, agents, include_host_tasks): 523 agents = list(agents) 524 if include_host_tasks: 525 self.assertEquals(len(agents), 4) 526 self._check_host_agent(agents.pop(0), 1) 527 self._check_host_agent(agents.pop(1), 2) 528 529 self.assertEquals(len(agents), 2) 530 self._check_abort_agent(agents[0], 1) 531 self._check_abort_agent(agents[1], 2) 532 533 534 def test_find_aborting_inactive(self): 535 self._create_job(hosts=[1, 2]) 536 self._update_hqe(set='status="Abort"') 537 538 self._dispatcher._find_aborting() 539 540 self._check_agents(self._dispatcher._agents, include_host_tasks=False) 541 self.god.check_playback() 542 543 544 def test_find_aborting_active(self): 545 self._create_job(hosts=[1, 2]) 546 self._update_hqe(set='status="Abort", active=1') 547 # have to make an Agent for the active HQEs 548 agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent') 549 agent.host_ids = agent.queue_entry_ids = [1, 2] 550 self._dispatcher.add_agent(agent) 551 552 self._dispatcher._find_aborting() 553 554 self._check_agents(self._dispatcher._agents, include_host_tasks=True) 555 self.god.check_playback() 556 557 # ensure agent gets aborted 558 abort1 = self._dispatcher._agents[1].queue.queue[0] 559 self.assertEquals(abort1.agents_to_abort, [agent]) 560 abort2 = self._dispatcher._agents[3].queue.queue[0] 561 self.assertEquals(abort2.agents_to_abort, []) 562 563 564class JobTimeoutTest(BaseSchedulerTest): 565 def _test_synch_start_timeout_helper(self, expect_abort, 566 set_created_on=True, set_active=True, 567 set_acl=True): 568 scheduler_config.config.synch_job_start_timeout_minutes = 60 569 job = self._create_job(hosts=[1, 2]) 570 if set_active: 571 hqe = job.hostqueueentry_set.filter(host__id=1)[0] 572 hqe.status = 'Pending' 573 hqe.active = 1 574 hqe.save() 575 576 everyone_acl = models.AclGroup.smart_get('Everyone') 577 host1 = models.Host.smart_get(1) 578 if set_acl: 579 everyone_acl.hosts.add(host1) 580 else: 581 everyone_acl.hosts.remove(host1) 582 583 job.created_on = datetime.datetime.now() 584 if set_created_on: 585 job.created_on -= datetime.timedelta(minutes=100) 586 job.save() 587 588 self._dispatcher._abort_jobs_past_synch_start_timeout() 589 590 for hqe in job.hostqueueentry_set.all(): 591 if expect_abort: 592 self.assert_(hqe.status in ('Abort', 'Aborted'), hqe.status) 593 else: 594 self.assert_(hqe.status not in ('Abort', 'Aborted'), hqe.status) 595 596 597 def test_synch_start_timeout_helper(self): 598 # no abort if any of the condition aren't met 599 self._test_synch_start_timeout_helper(False, set_created_on=False) 600 self._test_synch_start_timeout_helper(False, set_active=False) 601 self._test_synch_start_timeout_helper(False, set_acl=False) 602 # abort if all conditions are met 603 self._test_synch_start_timeout_helper(True) 604 605 606class PidfileRunMonitorTest(unittest.TestCase): 607 execution_tag = 'test_tag' 608 pid = 12345 609 process = drone_manager.Process('myhost', pid) 610 num_tests_failed = 1 611 612 def setUp(self): 613 self.god = mock.mock_god() 614 self.mock_drone_manager = self.god.create_mock_class( 615 drone_manager.DroneManager, 'drone_manager') 616 self.god.stub_with(monitor_db, '_drone_manager', 617 self.mock_drone_manager) 618 self.god.stub_function(email_manager.manager, 'enqueue_notify_email') 619 620 self.pidfile_id = object() 621 622 self.mock_drone_manager.get_pidfile_id_from.expect_call( 623 self.execution_tag).and_return(self.pidfile_id) 624 self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id) 625 626 self.monitor = monitor_db.PidfileRunMonitor() 627 self.monitor.attach_to_existing_process(self.execution_tag) 628 629 630 def tearDown(self): 631 self.god.unstub_all() 632 633 634 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 635 use_second_read=False): 636 contents = drone_manager.PidfileContents() 637 if pid is not None: 638 contents.process = drone_manager.Process('myhost', pid) 639 contents.exit_status = exit_code 640 contents.num_tests_failed = tests_failed 641 self.mock_drone_manager.get_pidfile_contents.expect_call( 642 self.pidfile_id, use_second_read=use_second_read).and_return( 643 contents) 644 645 646 def set_not_yet_run(self): 647 self.setup_pidfile() 648 649 650 def set_empty_pidfile(self): 651 self.setup_pidfile() 652 653 654 def set_running(self, use_second_read=False): 655 self.setup_pidfile(self.pid, use_second_read=use_second_read) 656 657 658 def set_complete(self, error_code, use_second_read=False): 659 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 660 use_second_read=use_second_read) 661 662 663 def _check_monitor(self, expected_pid, expected_exit_status, 664 expected_num_tests_failed): 665 if expected_pid is None: 666 self.assertEquals(self.monitor._state.process, None) 667 else: 668 self.assertEquals(self.monitor._state.process.pid, expected_pid) 669 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 670 self.assertEquals(self.monitor._state.num_tests_failed, 671 expected_num_tests_failed) 672 673 674 self.god.check_playback() 675 676 677 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 678 expected_num_tests_failed): 679 self.monitor._read_pidfile() 680 self._check_monitor(expected_pid, expected_exit_status, 681 expected_num_tests_failed) 682 683 684 def _get_expected_tests_failed(self, expected_exit_status): 685 if expected_exit_status is None: 686 expected_tests_failed = None 687 else: 688 expected_tests_failed = self.num_tests_failed 689 return expected_tests_failed 690 691 692 def test_read_pidfile(self): 693 self.set_not_yet_run() 694 self._test_read_pidfile_helper(None, None, None) 695 696 self.set_empty_pidfile() 697 self._test_read_pidfile_helper(None, None, None) 698 699 self.set_running() 700 self._test_read_pidfile_helper(self.pid, None, None) 701 702 self.set_complete(123) 703 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 704 705 706 def test_read_pidfile_error(self): 707 self.mock_drone_manager.get_pidfile_contents.expect_call( 708 self.pidfile_id, use_second_read=False).and_return( 709 drone_manager.InvalidPidfile('error')) 710 self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException, 711 self.monitor._read_pidfile) 712 self.god.check_playback() 713 714 715 def setup_is_running(self, is_running): 716 self.mock_drone_manager.is_process_running.expect_call( 717 self.process).and_return(is_running) 718 719 720 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 721 expected_num_tests_failed): 722 self.monitor._get_pidfile_info() 723 self._check_monitor(expected_pid, expected_exit_status, 724 expected_num_tests_failed) 725 726 727 def test_get_pidfile_info(self): 728 """ 729 normal cases for get_pidfile_info 730 """ 731 # running 732 self.set_running() 733 self.setup_is_running(True) 734 self._test_get_pidfile_info_helper(self.pid, None, None) 735 736 # exited during check 737 self.set_running() 738 self.setup_is_running(False) 739 self.set_complete(123, use_second_read=True) # pidfile gets read again 740 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 741 742 # completed 743 self.set_complete(123) 744 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 745 746 747 def test_get_pidfile_info_running_no_proc(self): 748 """ 749 pidfile shows process running, but no proc exists 750 """ 751 # running but no proc 752 self.set_running() 753 self.setup_is_running(False) 754 self.set_running(use_second_read=True) 755 email_manager.manager.enqueue_notify_email.expect_call( 756 mock.is_string_comparator(), mock.is_string_comparator()) 757 self._test_get_pidfile_info_helper(self.pid, 1, 0) 758 self.assertTrue(self.monitor.lost_process) 759 760 761 def test_get_pidfile_info_not_yet_run(self): 762 """ 763 pidfile hasn't been written yet 764 """ 765 self.set_not_yet_run() 766 self._test_get_pidfile_info_helper(None, None, None) 767 768 769 def test_process_failed_to_write_pidfile(self): 770 self.set_not_yet_run() 771 email_manager.manager.enqueue_notify_email.expect_call( 772 mock.is_string_comparator(), mock.is_string_comparator()) 773 dummy_process = drone_manager.Process('dummy', 12345) 774 self.mock_drone_manager.get_dummy_process.expect_call().and_return( 775 dummy_process) 776 self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1 777 self._test_get_pidfile_info_helper(12345, 1, 0) 778 779 780class AgentTest(unittest.TestCase): 781 def setUp(self): 782 self.god = mock.mock_god() 783 784 785 def tearDown(self): 786 self.god.unstub_all() 787 788 789 def _create_mock_task(self, name): 790 task = self.god.create_mock_class(monitor_db.AgentTask, name) 791 task.host_ids = task.queue_entry_ids = [] 792 return task 793 794 795 def test_agent(self): 796 task1 = self._create_mock_task('task1') 797 task2 = self._create_mock_task('task2') 798 task3 = self._create_mock_task('task3') 799 800 task1.start.expect_call() 801 task1.is_done.expect_call().and_return(False) 802 task1.poll.expect_call() 803 task1.is_done.expect_call().and_return(True) 804 task1.is_done.expect_call().and_return(True) 805 task1.success = True 806 807 task2.start.expect_call() 808 task2.is_done.expect_call().and_return(True) 809 task2.is_done.expect_call().and_return(True) 810 task2.success = False 811 task2.failure_tasks = [task3] 812 813 task3.start.expect_call() 814 task3.is_done.expect_call().and_return(True) 815 task3.is_done.expect_call().and_return(True) 816 task3.success = True 817 818 agent = monitor_db.Agent([task1, task2]) 819 agent.dispatcher = object() 820 agent.start() 821 while not agent.is_done(): 822 agent.tick() 823 self.god.check_playback() 824 825 826class AgentTasksTest(unittest.TestCase): 827 TEMP_DIR = '/abspath/tempdir' 828 RESULTS_DIR = '/results/dir' 829 HOSTNAME = 'myhost' 830 DUMMY_PROCESS = object() 831 HOST_PROTECTION = host_protections.default 832 PIDFILE_ID = object() 833 834 def setUp(self): 835 self.god = mock.mock_god() 836 self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path', 837 mock.mock_function('get_temporary_path', 838 default_return_val='tempdir')) 839 self.god.stub_function(drone_manager.DroneManager, 840 'copy_to_results_repository') 841 self.god.stub_function(drone_manager.DroneManager, 842 'get_pidfile_id_from') 843 844 def dummy_absolute_path(self, path): 845 return '/abspath/' + path 846 self.god.stub_with(drone_manager.DroneManager, 'absolute_path', 847 dummy_absolute_path) 848 849 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run') 850 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code') 851 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process') 852 self.host = self.god.create_mock_class(monitor_db.Host, 'host') 853 self.host.id = 1 854 self.host.hostname = self.HOSTNAME 855 self.host.protection = self.HOST_PROTECTION 856 self.queue_entry = self.god.create_mock_class( 857 monitor_db.HostQueueEntry, 'queue_entry') 858 self.job = self.god.create_mock_class(monitor_db.Job, 'job') 859 self.queue_entry.id = 1 860 self.queue_entry.job = self.job 861 self.queue_entry.host = self.host 862 self.queue_entry.meta_host = None 863 864 865 def tearDown(self): 866 self.god.unstub_all() 867 868 869 def run_task(self, task, success): 870 """ 871 Do essentially what an Agent would do, but protect againt 872 infinite looping from test errors. 873 """ 874 if not getattr(task, 'agent', None): 875 task.agent = object() 876 task.start() 877 count = 0 878 while not task.is_done(): 879 count += 1 880 if count > 10: 881 print 'Task failed to finish' 882 # in case the playback has clues to why it 883 # failed 884 self.god.check_playback() 885 self.fail() 886 task.poll() 887 self.assertEquals(task.success, success) 888 889 890 def setup_run_monitor(self, exit_status, copy_log_file=True): 891 monitor_db.PidfileRunMonitor.run.expect_call( 892 mock.is_instance_comparator(list), 893 'tempdir', 894 nice_level=monitor_db.AUTOSERV_NICE_LEVEL, 895 log_file=mock.anything_comparator()) 896 monitor_db.PidfileRunMonitor.exit_code.expect_call() 897 monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return( 898 exit_status) 899 900 if copy_log_file: 901 self._setup_move_logfile() 902 903 904 def _setup_move_logfile(self, include_destination=False): 905 monitor_db.PidfileRunMonitor.get_process.expect_call().and_return( 906 self.DUMMY_PROCESS) 907 if include_destination: 908 drone_manager.DroneManager.copy_to_results_repository.expect_call( 909 self.DUMMY_PROCESS, mock.is_string_comparator(), 910 destination_path=mock.is_string_comparator()) 911 else: 912 drone_manager.DroneManager.copy_to_results_repository.expect_call( 913 self.DUMMY_PROCESS, mock.is_string_comparator()) 914 915 916 def _test_repair_task_helper(self, success): 917 self.host.set_status.expect_call('Repairing') 918 if success: 919 self.setup_run_monitor(0) 920 self.host.set_status.expect_call('Ready') 921 else: 922 self.setup_run_monitor(1) 923 self.host.set_status.expect_call('Repair Failed') 924 925 task = monitor_db.RepairTask(self.host) 926 self.assertEquals(task.failure_tasks, []) 927 self.run_task(task, success) 928 929 expected_protection = host_protections.Protection.get_string( 930 host_protections.default) 931 expected_protection = host_protections.Protection.get_attr_name( 932 expected_protection) 933 934 self.assertTrue(set(task.cmd) >= 935 set([monitor_db._autoserv_path, '-p', '-R', '-m', 936 self.HOSTNAME, '-r', self.TEMP_DIR, 937 '--host-protection', expected_protection])) 938 self.god.check_playback() 939 940 941 def test_repair_task(self): 942 self._test_repair_task_helper(True) 943 self._test_repair_task_helper(False) 944 945 946 def test_repair_task_with_queue_entry(self): 947 self.host.set_status.expect_call('Repairing') 948 self.queue_entry.requeue.expect_call() 949 self.setup_run_monitor(1) 950 self.host.set_status.expect_call('Repair Failed') 951 self.queue_entry.handle_host_failure.expect_call() 952 953 task = monitor_db.RepairTask(self.host, self.queue_entry) 954 self.run_task(task, False) 955 self.god.check_playback() 956 957 958 def setup_verify_expects(self, success, use_queue_entry): 959 if use_queue_entry: 960 self.queue_entry.set_status.expect_call('Verifying') 961 self.host.set_status.expect_call('Verifying') 962 if success: 963 self.setup_run_monitor(0) 964 self.host.set_status.expect_call('Ready') 965 if use_queue_entry: 966 self.queue_entry.on_pending.expect_call() 967 else: 968 self.setup_run_monitor(1) 969 if use_queue_entry and not self.queue_entry.meta_host: 970 self.queue_entry.set_execution_subdir.expect_call() 971 self.queue_entry.execution_tag.expect_call().and_return('tag') 972 self._setup_move_logfile(include_destination=True) 973 974 975 def _check_verify_failure_tasks(self, verify_task): 976 self.assertEquals(len(verify_task.failure_tasks), 1) 977 repair_task = verify_task.failure_tasks[0] 978 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 979 self.assertEquals(verify_task.host, repair_task.host) 980 if verify_task.queue_entry: 981 self.assertEquals(repair_task.queue_entry, verify_task.queue_entry) 982 else: 983 self.assertEquals(repair_task.queue_entry, None) 984 985 986 def _test_verify_task_helper(self, success, use_queue_entry=False, 987 use_meta_host=False): 988 self.setup_verify_expects(success, use_queue_entry) 989 990 if use_queue_entry: 991 task = monitor_db.VerifyTask(queue_entry=self.queue_entry) 992 else: 993 task = monitor_db.VerifyTask(host=self.host) 994 self._check_verify_failure_tasks(task) 995 self.run_task(task, success) 996 self.assertTrue(set(task.cmd) >= 997 set([monitor_db._autoserv_path, '-p', '-v', '-m', 998 self.HOSTNAME, '-r', self.TEMP_DIR])) 999 self.god.check_playback() 1000 1001 1002 def test_verify_task_with_host(self): 1003 self._test_verify_task_helper(True) 1004 self._test_verify_task_helper(False) 1005 1006 1007 def test_verify_task_with_queue_entry(self): 1008 self._test_verify_task_helper(True, use_queue_entry=True) 1009 self._test_verify_task_helper(False, use_queue_entry=True) 1010 1011 1012 def test_verify_task_with_metahost(self): 1013 self.queue_entry.meta_host = 1 1014 self.test_verify_task_with_queue_entry() 1015 1016 1017 def test_abort_task(self): 1018 queue_entry = self.god.create_mock_class(monitor_db.HostQueueEntry, 1019 'queue_entry') 1020 queue_entry.id = 1 1021 queue_entry.host_id, queue_entry.job_id = 1, 2 1022 task = self.god.create_mock_class(monitor_db.AgentTask, 'task') 1023 agent = self.god.create_mock_class(monitor_db.Agent, 'agent') 1024 agent.active_task = task 1025 1026 task.abort.expect_call() 1027 queue_entry.set_status.expect_call('Aborted') 1028 1029 abort_task = monitor_db.AbortTask(queue_entry, [agent]) 1030 self.run_task(abort_task, True) 1031 self.god.check_playback() 1032 1033 1034 def _setup_pre_parse_expects(self, autoserv_success): 1035 self.queue_entry.execution_tag.expect_call().and_return('tag') 1036 self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new() 1037 self.pidfile_monitor.pidfile_id = self.PIDFILE_ID 1038 self.pidfile_monitor.attach_to_existing_process.expect_call('tag') 1039 if autoserv_success: 1040 code = 0 1041 else: 1042 code = 1 1043 self.pidfile_monitor.exit_code.expect_call().and_return(code) 1044 1045 self.queue_entry.set_status.expect_call('Parsing') 1046 1047 1048 def _setup_post_parse_expects(self, autoserv_success): 1049 if autoserv_success: 1050 status = 'Completed' 1051 else: 1052 status = 'Failed' 1053 self.queue_entry.set_status.expect_call(status) 1054 1055 1056 def setup_reparse_run_monitor(self): 1057 autoserv_pidfile_id = object() 1058 monitor = monitor_db.PidfileRunMonitor.expect_new() 1059 monitor.run.expect_call( 1060 mock.is_instance_comparator(list), 1061 'tag', 1062 log_file=mock.anything_comparator(), 1063 pidfile_name='.parser_execute', 1064 paired_with_pidfile=self.PIDFILE_ID) 1065 monitor.exit_code.expect_call() 1066 monitor.exit_code.expect_call().and_return(0) 1067 monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS) 1068 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1069 self.DUMMY_PROCESS, mock.is_string_comparator()) 1070 1071 1072 def _test_final_reparse_task_helper(self, autoserv_success=True): 1073 self._setup_pre_parse_expects(autoserv_success) 1074 self.setup_reparse_run_monitor() 1075 self._setup_post_parse_expects(autoserv_success) 1076 1077 task = monitor_db.FinalReparseTask([self.queue_entry]) 1078 self.run_task(task, True) 1079 1080 self.god.check_playback() 1081 cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r', 1082 '-o', '/abspath/tag'] 1083 self.assertEquals(task.cmd, cmd) 1084 1085 1086 def test_final_reparse_task(self): 1087 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1088 self._test_final_reparse_task_helper() 1089 self._test_final_reparse_task_helper(autoserv_success=False) 1090 1091 1092 def test_final_reparse_throttling(self): 1093 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1094 self.god.stub_function(monitor_db.FinalReparseTask, 1095 '_can_run_new_parse') 1096 1097 self._setup_pre_parse_expects(True) 1098 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1099 False) 1100 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1101 True) 1102 self.setup_reparse_run_monitor() 1103 self._setup_post_parse_expects(True) 1104 1105 task = monitor_db.FinalReparseTask([self.queue_entry]) 1106 self.run_task(task, True) 1107 self.god.check_playback() 1108 1109 1110 def _test_cleanup_task_helper(self, success, use_queue_entry=False): 1111 if use_queue_entry: 1112 self.queue_entry.get_host.expect_call().and_return(self.host) 1113 self.host.set_status.expect_call('Cleaning') 1114 if success: 1115 self.setup_run_monitor(0) 1116 self.host.set_status.expect_call('Ready') 1117 self.host.update_field.expect_call('dirty', 0) 1118 else: 1119 self.setup_run_monitor(1) 1120 if use_queue_entry and not self.queue_entry.meta_host: 1121 self.queue_entry.set_execution_subdir.expect_call() 1122 self.queue_entry.execution_tag.expect_call().and_return('tag') 1123 self._setup_move_logfile(include_destination=True) 1124 1125 if use_queue_entry: 1126 task = monitor_db.CleanupTask(queue_entry=self.queue_entry) 1127 else: 1128 task = monitor_db.CleanupTask(host=self.host) 1129 self.assertEquals(len(task.failure_tasks), 1) 1130 repair_task = task.failure_tasks[0] 1131 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 1132 if use_queue_entry: 1133 self.assertEquals(repair_task.queue_entry, self.queue_entry) 1134 1135 self.run_task(task, success) 1136 1137 self.god.check_playback() 1138 self.assert_(set(task.cmd) >= 1139 set([monitor_db._autoserv_path, '-p', '--cleanup', '-m', 1140 self.HOSTNAME, '-r', self.TEMP_DIR])) 1141 1142 def test_cleanup_task(self): 1143 self._test_cleanup_task_helper(True) 1144 self._test_cleanup_task_helper(False) 1145 1146 1147 def test_cleanup_task_with_queue_entry(self): 1148 self._test_cleanup_task_helper(False, True) 1149 1150 1151class JobTest(BaseSchedulerTest): 1152 def setUp(self): 1153 super(JobTest, self).setUp() 1154 self.god.stub_with( 1155 drone_manager.DroneManager, 'attach_file_to_execution', 1156 mock.mock_function('attach_file_to_execution', 1157 default_return_val='/test/path/tmp/foo')) 1158 1159 1160 def _setup_directory_expects(self, execution_subdir): 1161 job_path = os.path.join('.', '1-my_user') 1162 results_dir = os.path.join(job_path, execution_subdir) 1163 1164 1165 def _test_run_helper(self, expect_agent=True, expect_starting=False, 1166 expect_pending=False): 1167 if expect_starting: 1168 expected_status = models.HostQueueEntry.Status.STARTING 1169 elif expect_pending: 1170 expected_status = models.HostQueueEntry.Status.PENDING 1171 else: 1172 expected_status = models.HostQueueEntry.Status.VERIFYING 1173 job = monitor_db.Job.fetch('id = 1').next() 1174 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() 1175 agent = job.run(queue_entry) 1176 1177 self.god.check_playback() 1178 self.assertEquals(models.HostQueueEntry.smart_get(1).status, 1179 expected_status) 1180 1181 if not expect_agent: 1182 self.assertEquals(agent, None) 1183 return 1184 1185 self.assert_(isinstance(agent, monitor_db.Agent)) 1186 tasks = list(agent.queue.queue) 1187 return tasks 1188 1189 1190 def test_run_asynchronous(self): 1191 self._create_job(hosts=[1, 2]) 1192 1193 tasks = self._test_run_helper() 1194 1195 self.assertEquals(len(tasks), 1) 1196 verify_task = tasks[0] 1197 1198 self.assert_(isinstance(verify_task, monitor_db.VerifyTask)) 1199 self.assertEquals(verify_task.queue_entry.id, 1) 1200 1201 1202 def test_run_asynchronous_skip_verify(self): 1203 job = self._create_job(hosts=[1, 2]) 1204 job.run_verify = False 1205 job.save() 1206 self._setup_directory_expects('host1') 1207 1208 tasks = self._test_run_helper(expect_starting=True) 1209 1210 self.assertEquals(len(tasks), 1) 1211 queue_task = tasks[0] 1212 1213 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1214 self.assertEquals(queue_task.job.id, 1) 1215 1216 1217 def test_run_synchronous_verify(self): 1218 self._create_job(hosts=[1, 2], synchronous=True) 1219 1220 tasks = self._test_run_helper() 1221 self.assertEquals(len(tasks), 1) 1222 verify_task = tasks[0] 1223 1224 self.assert_(isinstance(verify_task, monitor_db.VerifyTask)) 1225 self.assertEquals(verify_task.queue_entry.id, 1) 1226 1227 1228 def test_run_synchronous_skip_verify(self): 1229 job = self._create_job(hosts=[1, 2], synchronous=True) 1230 job.run_verify = False 1231 job.save() 1232 1233 self._test_run_helper(expect_agent=False, expect_pending=True) 1234 1235 queue_entry = models.HostQueueEntry.smart_get(1) 1236 self.assertEquals(queue_entry.status, 'Pending') 1237 1238 1239 def test_run_synchronous_ready(self): 1240 self._create_job(hosts=[1, 2], synchronous=True) 1241 self._update_hqe("status='Pending', execution_subdir='") 1242 self._setup_directory_expects('group0') 1243 1244 tasks = self._test_run_helper(expect_starting=True) 1245 self.assertEquals(len(tasks), 1) 1246 queue_task = tasks[0] 1247 1248 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1249 self.assertEquals(queue_task.job.id, 1) 1250 hqe_ids = [hqe.id for hqe in queue_task.queue_entries] 1251 self.assertEquals(hqe_ids, [1, 2]) 1252 1253 1254 def test_reboot_before_always(self): 1255 job = self._create_job(hosts=[1]) 1256 job.reboot_before = models.RebootBefore.ALWAYS 1257 job.save() 1258 1259 tasks = self._test_run_helper() 1260 self.assertEquals(len(tasks), 2) 1261 cleanup_task = tasks[0] 1262 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 1263 self.assertEquals(cleanup_task.host.id, 1) 1264 1265 1266 def _test_reboot_before_if_dirty_helper(self, expect_reboot): 1267 job = self._create_job(hosts=[1]) 1268 job.reboot_before = models.RebootBefore.IF_DIRTY 1269 job.save() 1270 1271 tasks = self._test_run_helper() 1272 self.assertEquals(len(tasks), expect_reboot and 2 or 1) 1273 if expect_reboot: 1274 cleanup_task = tasks[0] 1275 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 1276 self.assertEquals(cleanup_task.host.id, 1) 1277 1278 def test_reboot_before_if_dirty(self): 1279 models.Host.smart_get(1).update_object(dirty=True) 1280 self._test_reboot_before_if_dirty_helper(True) 1281 1282 1283 def test_reboot_before_not_dirty(self): 1284 models.Host.smart_get(1).update_object(dirty=False) 1285 self._test_reboot_before_if_dirty_helper(False) 1286 1287 1288 1289if __name__ == '__main__': 1290 unittest.main() 1291