monitor_db_unittest.py revision 5e2bb4aa28611aaacaa8798fd07943ede1df46c6
1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4import gc, time 5import common 6from autotest_lib.frontend import setup_django_environment 7from autotest_lib.frontend.afe import frontend_test_utils 8from autotest_lib.client.common_lib.test_utils import mock 9from autotest_lib.client.common_lib.test_utils import unittest 10from autotest_lib.database import database_connection 11from autotest_lib.frontend.afe import models 12from autotest_lib.scheduler import agent_task 13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager 14from autotest_lib.scheduler import pidfile_monitor 15from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler 16from autotest_lib.scheduler import monitor_db_functional_test 17from autotest_lib.scheduler import scheduler_models 18 19_DEBUG = False 20 21 22class DummyAgentTask(object): 23 num_processes = 1 24 owner_username = 'my_user' 25 26 def get_drone_hostnames_allowed(self): 27 return None 28 29 30class DummyAgent(object): 31 started = False 32 _is_done = False 33 host_ids = () 34 queue_entry_ids = () 35 36 def __init__(self): 37 self.task = DummyAgentTask() 38 39 40 def tick(self): 41 self.started = True 42 43 44 def is_done(self): 45 return self._is_done 46 47 48 def set_done(self, done): 49 self._is_done = done 50 51 52class IsRow(mock.argument_comparator): 53 def __init__(self, row_id): 54 self.row_id = row_id 55 56 57 def is_satisfied_by(self, parameter): 58 return list(parameter)[0] == self.row_id 59 60 61 def __str__(self): 62 return 'row with id %s' % self.row_id 63 64 65class IsAgentWithTask(mock.argument_comparator): 66 def __init__(self, task): 67 self._task = task 68 69 70 def is_satisfied_by(self, parameter): 71 if not isinstance(parameter, monitor_db.Agent): 72 return False 73 tasks = list(parameter.queue.queue) 74 if len(tasks) != 1: 75 return False 76 return tasks[0] == self._task 77 78 79def _set_host_and_qe_ids(agent_or_task, id_list=None): 80 if id_list is None: 81 id_list = [] 82 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list 83 84 85class BaseSchedulerTest(unittest.TestCase, 86 frontend_test_utils.FrontendTestMixin): 87 _config_section = 'AUTOTEST_WEB' 88 89 def _do_query(self, sql): 90 self._database.execute(sql) 91 92 93 def _set_monitor_stubs(self): 94 # Clear the instance cache as this is a brand new database. 95 scheduler_models.DBObject._clear_instance_cache() 96 97 self._database = ( 98 database_connection.TranslatingDatabase.get_test_database( 99 translators=monitor_db_functional_test._DB_TRANSLATORS)) 100 self._database.connect(db_type='django') 101 self._database.debug = _DEBUG 102 103 self.god.stub_with(monitor_db, '_db', self._database) 104 self.god.stub_with(monitor_db.BaseDispatcher, 105 '_get_pending_queue_entries', 106 self._get_pending_hqes) 107 self.god.stub_with(scheduler_models, '_db', self._database) 108 self.god.stub_with(drone_manager.instance(), '_results_dir', 109 '/test/path') 110 self.god.stub_with(drone_manager.instance(), '_temporary_directory', 111 '/test/path/tmp') 112 113 monitor_db.initialize_globals() 114 scheduler_models.initialize_globals() 115 116 117 def setUp(self): 118 self._frontend_common_setup() 119 self._set_monitor_stubs() 120 self._dispatcher = monitor_db.Dispatcher() 121 122 123 def tearDown(self): 124 self._database.disconnect() 125 self._frontend_common_teardown() 126 127 128 def _update_hqe(self, set, where=''): 129 query = 'UPDATE afe_host_queue_entries SET ' + set 130 if where: 131 query += ' WHERE ' + where 132 self._do_query(query) 133 134 135 def _get_pending_hqes(self): 136 query_string=('afe_jobs.priority DESC, ' 137 'ifnull(nullif(host_id, NULL), host_id) DESC, ' 138 'ifnull(nullif(meta_host, NULL), meta_host) DESC, ' 139 'job_id') 140 return list(scheduler_models.HostQueueEntry.fetch( 141 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)', 142 where='NOT complete AND NOT active AND status="Queued"', 143 order_by=query_string)) 144 145 146class DispatcherSchedulingTest(BaseSchedulerTest): 147 _jobs_scheduled = [] 148 149 150 def tearDown(self): 151 super(DispatcherSchedulingTest, self).tearDown() 152 153 154 def _set_monitor_stubs(self): 155 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 156 157 def hqe__do_schedule_pre_job_tasks_stub(queue_entry): 158 """Called by HostQueueEntry.run().""" 159 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id) 160 queue_entry.set_status('Starting') 161 162 self.god.stub_with(scheduler_models.HostQueueEntry, 163 '_do_schedule_pre_job_tasks', 164 hqe__do_schedule_pre_job_tasks_stub) 165 166 167 def _record_job_scheduled(self, job_id, host_id): 168 record = (job_id, host_id) 169 self.assert_(record not in self._jobs_scheduled, 170 'Job %d scheduled on host %d twice' % 171 (job_id, host_id)) 172 self._jobs_scheduled.append(record) 173 174 175 def _assert_job_scheduled_on(self, job_id, host_id): 176 record = (job_id, host_id) 177 self.assert_(record in self._jobs_scheduled, 178 'Job %d not scheduled on host %d as expected\n' 179 'Jobs scheduled: %s' % 180 (job_id, host_id, self._jobs_scheduled)) 181 self._jobs_scheduled.remove(record) 182 183 184 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): 185 """Assert job was scheduled on exactly number hosts out of a set.""" 186 found = [] 187 for host_id in host_ids: 188 record = (job_id, host_id) 189 if record in self._jobs_scheduled: 190 found.append(record) 191 self._jobs_scheduled.remove(record) 192 if len(found) < number: 193 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' 194 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 195 elif len(found) > number: 196 self.fail('Job %d scheduled on more than %d hosts in %s.\n' 197 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 198 199 200 def _check_for_extra_schedulings(self): 201 if len(self._jobs_scheduled) != 0: 202 self.fail('Extra jobs scheduled: ' + 203 str(self._jobs_scheduled)) 204 205 206 def _convert_jobs_to_metahosts(self, *job_ids): 207 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 208 self._do_query('UPDATE afe_host_queue_entries SET ' 209 'meta_host=host_id, host_id=NULL ' 210 'WHERE job_id IN ' + sql_tuple) 211 212 213 def _lock_host(self, host_id): 214 self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' + 215 str(host_id)) 216 217 218 def setUp(self): 219 super(DispatcherSchedulingTest, self).setUp() 220 self._jobs_scheduled = [] 221 222 223 def _run_scheduler(self): 224 for _ in xrange(2): # metahost scheduling can take two cycles 225 self._dispatcher._schedule_new_jobs() 226 227 228 def _test_basic_scheduling_helper(self, use_metahosts): 229 'Basic nonmetahost scheduling' 230 self._create_job_simple([1], use_metahosts) 231 self._create_job_simple([2], use_metahosts) 232 self._run_scheduler() 233 self._assert_job_scheduled_on(1, 1) 234 self._assert_job_scheduled_on(2, 2) 235 self._check_for_extra_schedulings() 236 237 238 def _test_priorities_helper(self, use_metahosts): 239 'Test prioritization ordering' 240 self._create_job_simple([1], use_metahosts) 241 self._create_job_simple([2], use_metahosts) 242 self._create_job_simple([1,2], use_metahosts) 243 self._create_job_simple([1], use_metahosts, priority=1) 244 self._run_scheduler() 245 self._assert_job_scheduled_on(4, 1) # higher priority 246 self._assert_job_scheduled_on(2, 2) # earlier job over later 247 self._check_for_extra_schedulings() 248 249 250 def _test_hosts_ready_helper(self, use_metahosts): 251 """ 252 Only hosts that are status=Ready, unlocked and not invalid get 253 scheduled. 254 """ 255 self._create_job_simple([1], use_metahosts) 256 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1') 257 self._run_scheduler() 258 self._check_for_extra_schedulings() 259 260 self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 ' 261 'WHERE id=1') 262 self._run_scheduler() 263 self._check_for_extra_schedulings() 264 265 self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 ' 266 'WHERE id=1') 267 self._run_scheduler() 268 if not use_metahosts: 269 self._assert_job_scheduled_on(1, 1) 270 self._check_for_extra_schedulings() 271 272 273 def _test_hosts_idle_helper(self, use_metahosts): 274 'Only idle hosts get scheduled' 275 self._create_job(hosts=[1], active=True) 276 self._create_job_simple([1], use_metahosts) 277 self._run_scheduler() 278 self._check_for_extra_schedulings() 279 280 281 def _test_obey_ACLs_helper(self, use_metahosts): 282 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 283 self._create_job_simple([1], use_metahosts) 284 self._run_scheduler() 285 self._check_for_extra_schedulings() 286 287 288 def test_basic_scheduling(self): 289 self._test_basic_scheduling_helper(False) 290 291 292 def test_priorities(self): 293 self._test_priorities_helper(False) 294 295 296 def test_hosts_ready(self): 297 self._test_hosts_ready_helper(False) 298 299 300 def test_hosts_idle(self): 301 self._test_hosts_idle_helper(False) 302 303 304 def test_obey_ACLs(self): 305 self._test_obey_ACLs_helper(False) 306 307 308 def test_one_time_hosts_ignore_ACLs(self): 309 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1') 310 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1') 311 self._create_job_simple([1]) 312 self._run_scheduler() 313 self._assert_job_scheduled_on(1, 1) 314 self._check_for_extra_schedulings() 315 316 317 def test_non_metahost_on_invalid_host(self): 318 """ 319 Non-metahost entries can get scheduled on invalid hosts (this is how 320 one-time hosts work). 321 """ 322 self._do_query('UPDATE afe_hosts SET invalid=1') 323 self._test_basic_scheduling_helper(False) 324 325 326 def test_metahost_scheduling(self): 327 """ 328 Basic metahost scheduling 329 """ 330 self._test_basic_scheduling_helper(True) 331 332 333 def test_metahost_priorities(self): 334 self._test_priorities_helper(True) 335 336 337 def test_metahost_hosts_ready(self): 338 self._test_hosts_ready_helper(True) 339 340 341 def test_metahost_hosts_idle(self): 342 self._test_hosts_idle_helper(True) 343 344 345 def test_metahost_obey_ACLs(self): 346 self._test_obey_ACLs_helper(True) 347 348 349 def _setup_test_only_if_needed_labels(self): 350 # apply only_if_needed label3 to host1 351 models.Host.smart_get('host1').labels.add(self.label3) 352 return self._create_job_simple([1], use_metahost=True) 353 354 355 def test_only_if_needed_labels_avoids_host(self): 356 job = self._setup_test_only_if_needed_labels() 357 # if the job doesn't depend on label3, there should be no scheduling 358 self._run_scheduler() 359 self._check_for_extra_schedulings() 360 361 362 def test_only_if_needed_labels_schedules(self): 363 job = self._setup_test_only_if_needed_labels() 364 job.dependency_labels.add(self.label3) 365 self._run_scheduler() 366 self._assert_job_scheduled_on(1, 1) 367 self._check_for_extra_schedulings() 368 369 370 def test_only_if_needed_labels_via_metahost(self): 371 job = self._setup_test_only_if_needed_labels() 372 job.dependency_labels.add(self.label3) 373 # should also work if the metahost is the only_if_needed label 374 self._do_query('DELETE FROM afe_jobs_dependency_labels') 375 self._create_job(metahosts=[3]) 376 self._run_scheduler() 377 self._assert_job_scheduled_on(2, 1) 378 self._check_for_extra_schedulings() 379 380 381 def test_nonmetahost_over_metahost(self): 382 """ 383 Non-metahost entries should take priority over metahost entries 384 for the same host 385 """ 386 self._create_job(metahosts=[1]) 387 self._create_job(hosts=[1]) 388 self._run_scheduler() 389 self._assert_job_scheduled_on(2, 1) 390 self._check_for_extra_schedulings() 391 392 393 def test_metahosts_obey_blocks(self): 394 """ 395 Metahosts can't get scheduled on hosts already scheduled for 396 that job. 397 """ 398 self._create_job(metahosts=[1], hosts=[1]) 399 # make the nonmetahost entry complete, so the metahost can try 400 # to get scheduled 401 self._update_hqe(set='complete = 1', where='host_id=1') 402 self._run_scheduler() 403 self._check_for_extra_schedulings() 404 405 406 # TODO(gps): These should probably live in their own TestCase class 407 # specific to testing HostScheduler methods directly. It was convenient 408 # to put it here for now to share existing test environment setup code. 409 def test_HostScheduler_check_atomic_group_labels(self): 410 normal_job = self._create_job(metahosts=[0]) 411 atomic_job = self._create_job(atomic_group=1) 412 # Indirectly initialize the internal state of the host scheduler. 413 self._dispatcher._refresh_pending_queue_entries() 414 415 atomic_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % 416 atomic_job.id)[0] 417 normal_hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % 418 normal_job.id)[0] 419 420 host_scheduler = self._dispatcher._host_scheduler 421 self.assertTrue(host_scheduler._check_atomic_group_labels( 422 [self.label4.id], atomic_hqe)) 423 self.assertFalse(host_scheduler._check_atomic_group_labels( 424 [self.label4.id], normal_hqe)) 425 self.assertFalse(host_scheduler._check_atomic_group_labels( 426 [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) 427 self.assertTrue(host_scheduler._check_atomic_group_labels( 428 [self.label4.id, self.label6.id], atomic_hqe)) 429 self.assertTrue(host_scheduler._check_atomic_group_labels( 430 [self.label4.id, self.label5.id], 431 atomic_hqe)) 432 433# TODO: Revive this test. 434# def test_HostScheduler_get_host_atomic_group_id(self): 435# job = self._create_job(metahosts=[self.label6.id]) 436# queue_entry = scheduler_models.HostQueueEntry.fetch( 437# where='job_id=%d' % job.id)[0] 438# # Indirectly initialize the internal state of the host scheduler. 439# self._dispatcher._refresh_pending_queue_entries() 440# 441# # Test the host scheduler 442# host_scheduler = self._dispatcher._host_scheduler 443# 444# 445# # Two labels each in a different atomic group. This should log an 446# # error and continue. 447# orig_logging_error = logging.error 448# def mock_logging_error(message, *args): 449# mock_logging_error._num_calls += 1 450# # Test the logging call itself, we just wrapped it to count it. 451# orig_logging_error(message, *args) 452# mock_logging_error._num_calls = 0 453# self.god.stub_with(logging, 'error', mock_logging_error) 454# host_scheduler.refresh([]) 455# self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id( 456# [self.label4.id, self.label8.id], queue_entry)) 457# self.assertTrue(mock_logging_error._num_calls > 0) 458# self.god.unstub(logging, 'error') 459# 460# # Two labels both in the same atomic group, this should not raise an 461# # error, it will merely cause the job to schedule on the intersection. 462# self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 463# [self.label4.id, self.label5.id])) 464# 465# self.assertEquals(None, host_scheduler._get_host_atomic_group_id([])) 466# self.assertEquals(None, host_scheduler._get_host_atomic_group_id( 467# [self.label3.id, self.label7.id, self.label6.id])) 468# self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 469# [self.label4.id, self.label7.id, self.label6.id])) 470# self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 471# [self.label7.id, self.label5.id])) 472 473 474 def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): 475 # Create a job scheduled to run on label6. 476 self._create_job(metahosts=[self.label6.id]) 477 self._run_scheduler() 478 # label6 only has hosts that are in atomic groups associated with it, 479 # there should be no scheduling. 480 self._check_for_extra_schedulings() 481 482 483 def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): 484 # Create a job scheduled to run on label5. This is an atomic group 485 # label but this job does not request atomic group scheduling. 486 self._create_job(metahosts=[self.label5.id]) 487 self._run_scheduler() 488 # label6 only has hosts that are in atomic groups associated with it, 489 # there should be no scheduling. 490 self._check_for_extra_schedulings() 491 492 493 def test_atomic_group_scheduling_basics(self): 494 # Create jobs scheduled to run on an atomic group. 495 job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], 496 atomic_group=1) 497 job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], 498 atomic_group=1) 499 self._run_scheduler() 500 # atomic_group.max_number_of_machines was 2 so we should run on 2. 501 self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) 502 self._assert_job_scheduled_on(job_b.id, 8) # label5 503 self._assert_job_scheduled_on(job_b.id, 9) # label5 504 self._check_for_extra_schedulings() 505 506 # The three host label4 atomic group still has one host available. 507 # That means a job with a synch_count of 1 asking to be scheduled on 508 # the atomic group can still use the final machine. 509 # 510 # This may seem like a somewhat odd use case. It allows the use of an 511 # atomic group as a set of machines to run smaller jobs within (a set 512 # of hosts configured for use in network tests with eachother perhaps?) 513 onehost_job = self._create_job(atomic_group=1) 514 self._run_scheduler() 515 self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) 516 self._check_for_extra_schedulings() 517 518 # No more atomic groups have hosts available, no more jobs should 519 # be scheduled. 520 self._create_job(atomic_group=1) 521 self._run_scheduler() 522 self._check_for_extra_schedulings() 523 524 525 def test_atomic_group_scheduling_obeys_acls(self): 526 # Request scheduling on a specific atomic label but be denied by ACLs. 527 self._do_query('DELETE FROM afe_acl_groups_hosts ' 528 'WHERE host_id in (8,9)') 529 job = self._create_job(metahosts=[self.label5.id], atomic_group=1) 530 self._run_scheduler() 531 self._check_for_extra_schedulings() 532 533 534 def test_atomic_group_scheduling_dependency_label_exclude(self): 535 # A dependency label that matches no hosts in the atomic group. 536 job_a = self._create_job(atomic_group=1) 537 job_a.dependency_labels.add(self.label3) 538 self._run_scheduler() 539 self._check_for_extra_schedulings() 540 541 542 def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): 543 # A metahost and dependency label that excludes too many hosts. 544 job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], 545 atomic_group=1) 546 job_b.dependency_labels.add(self.label7) 547 self._run_scheduler() 548 self._check_for_extra_schedulings() 549 550 551 def test_atomic_group_scheduling_dependency_label_match(self): 552 # A dependency label that exists on enough atomic group hosts in only 553 # one of the two atomic group labels. 554 job_c = self._create_job(synchronous=True, atomic_group=1) 555 job_c.dependency_labels.add(self.label7) 556 self._run_scheduler() 557 self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) 558 self._check_for_extra_schedulings() 559 560 561 def test_atomic_group_scheduling_no_metahost(self): 562 # Force it to schedule on the other group for a reliable test. 563 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') 564 # An atomic job without a metahost. 565 job = self._create_job(synchronous=True, atomic_group=1) 566 self._run_scheduler() 567 self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) 568 self._check_for_extra_schedulings() 569 570 571 def test_atomic_group_scheduling_partial_group(self): 572 # Make one host in labels[3] unavailable so that there are only two 573 # hosts left in the group. 574 self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5') 575 job = self._create_job(synchronous=True, metahosts=[self.label4.id], 576 atomic_group=1) 577 self._run_scheduler() 578 # Verify that it was scheduled on the 2 ready hosts in that group. 579 self._assert_job_scheduled_on(job.id, 6) 580 self._assert_job_scheduled_on(job.id, 7) 581 self._check_for_extra_schedulings() 582 583 584 def test_atomic_group_scheduling_not_enough_available(self): 585 # Mark some hosts in each atomic group label as not usable. 586 # One host running, another invalid in the first group label. 587 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5') 588 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6') 589 # One host invalid in the second group label. 590 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9') 591 # Nothing to schedule when no group label has enough (2) good hosts.. 592 self._create_job(atomic_group=1, synchronous=True) 593 self._run_scheduler() 594 # There are not enough hosts in either atomic group, 595 # No more scheduling should occur. 596 self._check_for_extra_schedulings() 597 598 # Now create an atomic job that has a synch count of 1. It should 599 # schedule on exactly one of the hosts. 600 onehost_job = self._create_job(atomic_group=1) 601 self._run_scheduler() 602 self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) 603 604 605 def test_atomic_group_scheduling_no_valid_hosts(self): 606 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)') 607 self._create_job(synchronous=True, metahosts=[self.label5.id], 608 atomic_group=1) 609 self._run_scheduler() 610 # no hosts in the selected group and label are valid. no schedulings. 611 self._check_for_extra_schedulings() 612 613 614 def test_atomic_group_scheduling_metahost_works(self): 615 # Test that atomic group scheduling also obeys metahosts. 616 self._create_job(metahosts=[0], atomic_group=1) 617 self._run_scheduler() 618 # There are no atomic group hosts that also have that metahost. 619 self._check_for_extra_schedulings() 620 621 job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) 622 self._run_scheduler() 623 self._assert_job_scheduled_on(job_b.id, 8) 624 self._assert_job_scheduled_on(job_b.id, 9) 625 self._check_for_extra_schedulings() 626 627 628 def test_atomic_group_skips_ineligible_hosts(self): 629 # Test hosts marked ineligible for this job are not eligible. 630 # How would this ever happen anyways? 631 job = self._create_job(metahosts=[self.label4.id], atomic_group=1) 632 models.IneligibleHostQueue.objects.create(job=job, host_id=5) 633 models.IneligibleHostQueue.objects.create(job=job, host_id=6) 634 models.IneligibleHostQueue.objects.create(job=job, host_id=7) 635 self._run_scheduler() 636 # No scheduling should occur as all desired hosts were ineligible. 637 self._check_for_extra_schedulings() 638 639 640 def test_atomic_group_scheduling_fail(self): 641 # If synch_count is > the atomic group number of machines, the job 642 # should be aborted immediately. 643 model_job = self._create_job(synchronous=True, atomic_group=1) 644 model_job.synch_count = 4 645 model_job.save() 646 job = scheduler_models.Job(id=model_job.id) 647 self._run_scheduler() 648 self._check_for_extra_schedulings() 649 queue_entries = job.get_host_queue_entries() 650 self.assertEqual(1, len(queue_entries)) 651 self.assertEqual(queue_entries[0].status, 652 models.HostQueueEntry.Status.ABORTED) 653 654 655 def test_atomic_group_no_labels_no_scheduling(self): 656 # Never schedule on atomic groups marked invalid. 657 job = self._create_job(metahosts=[self.label5.id], synchronous=True, 658 atomic_group=1) 659 # Deleting an atomic group via the frontend marks it invalid and 660 # removes all label references to the group. The job now references 661 # an invalid atomic group with no labels associated with it. 662 self.label5.atomic_group.invalid = True 663 self.label5.atomic_group.save() 664 self.label5.atomic_group = None 665 self.label5.save() 666 667 self._run_scheduler() 668 self._check_for_extra_schedulings() 669 670 671 def test_schedule_directly_on_atomic_group_host_fail(self): 672 # Scheduling a job directly on hosts in an atomic group must 673 # fail to avoid users inadvertently holding up the use of an 674 # entire atomic group by using the machines individually. 675 job = self._create_job(hosts=[5]) 676 self._run_scheduler() 677 self._check_for_extra_schedulings() 678 679 680 def test_schedule_directly_on_atomic_group_host(self): 681 # Scheduling a job directly on one host in an atomic group will 682 # work when the atomic group is listed on the HQE in addition 683 # to the host (assuming the sync count is 1). 684 job = self._create_job(hosts=[5], atomic_group=1) 685 self._run_scheduler() 686 self._assert_job_scheduled_on(job.id, 5) 687 self._check_for_extra_schedulings() 688 689 690 def test_schedule_directly_on_atomic_group_hosts_sync2(self): 691 job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) 692 self._run_scheduler() 693 self._assert_job_scheduled_on(job.id, 5) 694 self._assert_job_scheduled_on(job.id, 8) 695 self._check_for_extra_schedulings() 696 697 698 def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): 699 job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) 700 self._run_scheduler() 701 self._check_for_extra_schedulings() 702 703 704 def test_only_schedule_queued_entries(self): 705 self._create_job(metahosts=[1]) 706 self._update_hqe(set='active=1, host_id=2') 707 self._run_scheduler() 708 self._check_for_extra_schedulings() 709 710 711 def test_no_ready_hosts(self): 712 self._create_job(hosts=[1]) 713 self._do_query('UPDATE afe_hosts SET status="Repair Failed"') 714 self._run_scheduler() 715 self._check_for_extra_schedulings() 716 717 718 def test_garbage_collection(self): 719 self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats', 720 999999) 721 self.god.stub_function(gc, 'collect') 722 self.god.stub_function(gc_stats, '_log_garbage_collector_stats') 723 gc.collect.expect_call().and_return(0) 724 gc_stats._log_garbage_collector_stats.expect_call() 725 # Force a garbage collection run 726 self._dispatcher._last_garbage_stats_time = 0 727 self._dispatcher._garbage_collection() 728 # The previous call should have reset the time, it won't do anything 729 # the second time. If it does, we'll get an unexpected call. 730 self._dispatcher._garbage_collection() 731 732 733 734class DispatcherThrottlingTest(BaseSchedulerTest): 735 """ 736 Test that the dispatcher throttles: 737 * total number of running processes 738 * number of processes started per cycle 739 """ 740 _MAX_RUNNING = 3 741 _MAX_STARTED = 2 742 743 def setUp(self): 744 super(DispatcherThrottlingTest, self).setUp() 745 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING 746 scheduler_config.config.max_processes_started_per_cycle = ( 747 self._MAX_STARTED) 748 749 def fake_max_runnable_processes(fake_self, username, 750 drone_hostnames_allowed): 751 running = sum(agent.task.num_processes 752 for agent in self._agents 753 if agent.started and not agent.is_done()) 754 return self._MAX_RUNNING - running 755 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', 756 fake_max_runnable_processes) 757 758 759 def _setup_some_agents(self, num_agents): 760 self._agents = [DummyAgent() for i in xrange(num_agents)] 761 self._dispatcher._agents = list(self._agents) 762 763 764 def _run_a_few_cycles(self): 765 for i in xrange(4): 766 self._dispatcher._handle_agents() 767 768 769 def _assert_agents_started(self, indexes, is_started=True): 770 for i in indexes: 771 self.assert_(self._agents[i].started == is_started, 772 'Agent %d %sstarted' % 773 (i, is_started and 'not ' or '')) 774 775 776 def _assert_agents_not_started(self, indexes): 777 self._assert_agents_started(indexes, False) 778 779 780 def test_throttle_total(self): 781 self._setup_some_agents(4) 782 self._run_a_few_cycles() 783 self._assert_agents_started([0, 1, 2]) 784 self._assert_agents_not_started([3]) 785 786 787 def test_throttle_per_cycle(self): 788 self._setup_some_agents(3) 789 self._dispatcher._handle_agents() 790 self._assert_agents_started([0, 1]) 791 self._assert_agents_not_started([2]) 792 793 794 def test_throttle_with_synchronous(self): 795 self._setup_some_agents(2) 796 self._agents[0].task.num_processes = 3 797 self._run_a_few_cycles() 798 self._assert_agents_started([0]) 799 self._assert_agents_not_started([1]) 800 801 802 def test_large_agent_starvation(self): 803 """ 804 Ensure large agents don't get starved by lower-priority agents. 805 """ 806 self._setup_some_agents(3) 807 self._agents[1].task.num_processes = 3 808 self._run_a_few_cycles() 809 self._assert_agents_started([0]) 810 self._assert_agents_not_started([1, 2]) 811 812 self._agents[0].set_done(True) 813 self._run_a_few_cycles() 814 self._assert_agents_started([1]) 815 self._assert_agents_not_started([2]) 816 817 818 def test_zero_process_agent(self): 819 self._setup_some_agents(5) 820 self._agents[4].task.num_processes = 0 821 self._run_a_few_cycles() 822 self._assert_agents_started([0, 1, 2, 4]) 823 self._assert_agents_not_started([3]) 824 825 826class PidfileRunMonitorTest(unittest.TestCase): 827 execution_tag = 'test_tag' 828 pid = 12345 829 process = drone_manager.Process('myhost', pid) 830 num_tests_failed = 1 831 832 def setUp(self): 833 self.god = mock.mock_god() 834 self.mock_drone_manager = self.god.create_mock_class( 835 drone_manager.DroneManager, 'drone_manager') 836 self.god.stub_with(pidfile_monitor, '_drone_manager', 837 self.mock_drone_manager) 838 self.god.stub_function(email_manager.manager, 'enqueue_notify_email') 839 self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs', 840 self._mock_get_pidfile_timeout_secs) 841 842 self.pidfile_id = object() 843 844 (self.mock_drone_manager.get_pidfile_id_from 845 .expect_call(self.execution_tag, 846 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 847 .and_return(self.pidfile_id)) 848 849 self.monitor = pidfile_monitor.PidfileRunMonitor() 850 self.monitor.attach_to_existing_process(self.execution_tag) 851 852 def tearDown(self): 853 self.god.unstub_all() 854 855 856 def _mock_get_pidfile_timeout_secs(self): 857 return 300 858 859 860 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 861 use_second_read=False): 862 contents = drone_manager.PidfileContents() 863 if pid is not None: 864 contents.process = drone_manager.Process('myhost', pid) 865 contents.exit_status = exit_code 866 contents.num_tests_failed = tests_failed 867 self.mock_drone_manager.get_pidfile_contents.expect_call( 868 self.pidfile_id, use_second_read=use_second_read).and_return( 869 contents) 870 871 872 def set_not_yet_run(self): 873 self.setup_pidfile() 874 875 876 def set_empty_pidfile(self): 877 self.setup_pidfile() 878 879 880 def set_running(self, use_second_read=False): 881 self.setup_pidfile(self.pid, use_second_read=use_second_read) 882 883 884 def set_complete(self, error_code, use_second_read=False): 885 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 886 use_second_read=use_second_read) 887 888 889 def _check_monitor(self, expected_pid, expected_exit_status, 890 expected_num_tests_failed): 891 if expected_pid is None: 892 self.assertEquals(self.monitor._state.process, None) 893 else: 894 self.assertEquals(self.monitor._state.process.pid, expected_pid) 895 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 896 self.assertEquals(self.monitor._state.num_tests_failed, 897 expected_num_tests_failed) 898 899 900 self.god.check_playback() 901 902 903 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 904 expected_num_tests_failed): 905 self.monitor._read_pidfile() 906 self._check_monitor(expected_pid, expected_exit_status, 907 expected_num_tests_failed) 908 909 910 def _get_expected_tests_failed(self, expected_exit_status): 911 if expected_exit_status is None: 912 expected_tests_failed = None 913 else: 914 expected_tests_failed = self.num_tests_failed 915 return expected_tests_failed 916 917 918 def test_read_pidfile(self): 919 self.set_not_yet_run() 920 self._test_read_pidfile_helper(None, None, None) 921 922 self.set_empty_pidfile() 923 self._test_read_pidfile_helper(None, None, None) 924 925 self.set_running() 926 self._test_read_pidfile_helper(self.pid, None, None) 927 928 self.set_complete(123) 929 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 930 931 932 def test_read_pidfile_error(self): 933 self.mock_drone_manager.get_pidfile_contents.expect_call( 934 self.pidfile_id, use_second_read=False).and_return( 935 drone_manager.InvalidPidfile('error')) 936 self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException, 937 self.monitor._read_pidfile) 938 self.god.check_playback() 939 940 941 def setup_is_running(self, is_running): 942 self.mock_drone_manager.is_process_running.expect_call( 943 self.process).and_return(is_running) 944 945 946 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 947 expected_num_tests_failed): 948 self.monitor._get_pidfile_info() 949 self._check_monitor(expected_pid, expected_exit_status, 950 expected_num_tests_failed) 951 952 953 def test_get_pidfile_info(self): 954 """ 955 normal cases for get_pidfile_info 956 """ 957 # running 958 self.set_running() 959 self.setup_is_running(True) 960 self._test_get_pidfile_info_helper(self.pid, None, None) 961 962 # exited during check 963 self.set_running() 964 self.setup_is_running(False) 965 self.set_complete(123, use_second_read=True) # pidfile gets read again 966 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 967 968 # completed 969 self.set_complete(123) 970 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 971 972 973 def test_get_pidfile_info_running_no_proc(self): 974 """ 975 pidfile shows process running, but no proc exists 976 """ 977 # running but no proc 978 self.set_running() 979 self.setup_is_running(False) 980 self.set_running(use_second_read=True) 981 email_manager.manager.enqueue_notify_email.expect_call( 982 mock.is_string_comparator(), mock.is_string_comparator()) 983 self._test_get_pidfile_info_helper(self.pid, 1, 0) 984 self.assertTrue(self.monitor.lost_process) 985 986 987 def test_get_pidfile_info_not_yet_run(self): 988 """ 989 pidfile hasn't been written yet 990 """ 991 self.set_not_yet_run() 992 self._test_get_pidfile_info_helper(None, None, None) 993 994 995 def test_process_failed_to_write_pidfile(self): 996 self.set_not_yet_run() 997 email_manager.manager.enqueue_notify_email.expect_call( 998 mock.is_string_comparator(), mock.is_string_comparator()) 999 self.monitor._start_time = (time.time() - 1000 pidfile_monitor._get_pidfile_timeout_secs() - 1) 1001 self._test_get_pidfile_info_helper(None, 1, 0) 1002 self.assertTrue(self.monitor.lost_process) 1003 1004 1005class AgentTest(unittest.TestCase): 1006 def setUp(self): 1007 self.god = mock.mock_god() 1008 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 1009 'dispatcher') 1010 1011 1012 def tearDown(self): 1013 self.god.unstub_all() 1014 1015 1016 def _create_mock_task(self, name): 1017 task = self.god.create_mock_class(agent_task.AgentTask, name) 1018 task.num_processes = 1 1019 _set_host_and_qe_ids(task) 1020 return task 1021 1022 def _create_agent(self, task): 1023 agent = monitor_db.Agent(task) 1024 agent.dispatcher = self._dispatcher 1025 return agent 1026 1027 1028 def _finish_agent(self, agent): 1029 while not agent.is_done(): 1030 agent.tick() 1031 1032 1033 def test_agent_abort(self): 1034 task = self._create_mock_task('task') 1035 task.poll.expect_call() 1036 task.is_done.expect_call().and_return(False) 1037 task.abort.expect_call() 1038 task.aborted = True 1039 1040 agent = self._create_agent(task) 1041 agent.tick() 1042 agent.abort() 1043 self._finish_agent(agent) 1044 self.god.check_playback() 1045 1046 1047 def _test_agent_abort_before_started_helper(self, ignore_abort=False): 1048 task = self._create_mock_task('task') 1049 task.abort.expect_call() 1050 if ignore_abort: 1051 task.aborted = False 1052 task.poll.expect_call() 1053 task.is_done.expect_call().and_return(True) 1054 task.success = True 1055 else: 1056 task.aborted = True 1057 1058 agent = self._create_agent(task) 1059 agent.abort() 1060 self._finish_agent(agent) 1061 self.god.check_playback() 1062 1063 1064 def test_agent_abort_before_started(self): 1065 self._test_agent_abort_before_started_helper() 1066 self._test_agent_abort_before_started_helper(True) 1067 1068 1069class JobSchedulingTest(BaseSchedulerTest): 1070 def _test_run_helper(self, expect_agent=True, expect_starting=False, 1071 expect_pending=False): 1072 if expect_starting: 1073 expected_status = models.HostQueueEntry.Status.STARTING 1074 elif expect_pending: 1075 expected_status = models.HostQueueEntry.Status.PENDING 1076 else: 1077 expected_status = models.HostQueueEntry.Status.VERIFYING 1078 job = scheduler_models.Job.fetch('id = 1')[0] 1079 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 1080 assert queue_entry.job is job 1081 job.run_if_ready(queue_entry) 1082 1083 self.god.check_playback() 1084 1085 self._dispatcher._schedule_delay_tasks() 1086 self._dispatcher._schedule_running_host_queue_entries() 1087 agent = self._dispatcher._agents[0] 1088 1089 actual_status = models.HostQueueEntry.smart_get(1).status 1090 self.assertEquals(expected_status, actual_status) 1091 1092 if not expect_agent: 1093 self.assertEquals(agent, None) 1094 return 1095 1096 self.assert_(isinstance(agent, monitor_db.Agent)) 1097 self.assert_(agent.task) 1098 return agent.task 1099 1100 1101 def test_run_if_ready_delays(self): 1102 # Also tests Job.run_with_ready_delay() on atomic group jobs. 1103 django_job = self._create_job(hosts=[5, 6], atomic_group=1) 1104 job = scheduler_models.Job(django_job.id) 1105 self.assertEqual(1, job.synch_count) 1106 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) 1107 self.assertEqual(2, len(django_hqes)) 1108 self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines) 1109 1110 def set_hqe_status(django_hqe, status): 1111 django_hqe.status = status 1112 django_hqe.save() 1113 scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status) 1114 1115 # An initial state, our synch_count is 1 1116 set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING) 1117 set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING) 1118 1119 # So that we don't depend on the config file value during the test. 1120 self.assert_(scheduler_config.config 1121 .secs_to_wait_for_atomic_group_hosts is not None) 1122 self.god.stub_with(scheduler_config.config, 1123 'secs_to_wait_for_atomic_group_hosts', 123456) 1124 1125 # Get the pending one as a scheduler_models.HostQueueEntry object. 1126 hqe = scheduler_models.HostQueueEntry(django_hqes[1].id) 1127 self.assert_(not job._delay_ready_task) 1128 self.assertTrue(job.is_ready()) 1129 1130 # Ready with one pending, one verifying and an atomic group should 1131 # result in a DelayCallTask to re-check if we're ready a while later. 1132 job.run_if_ready(hqe) 1133 self.assertEquals('Waiting', hqe.status) 1134 self._dispatcher._schedule_delay_tasks() 1135 self.assertEquals('Pending', hqe.status) 1136 agent = self._dispatcher._agents[0] 1137 self.assert_(job._delay_ready_task) 1138 self.assert_(isinstance(agent, monitor_db.Agent)) 1139 self.assert_(agent.task) 1140 delay_task = agent.task 1141 self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask)) 1142 self.assert_(not delay_task.is_done()) 1143 1144 self.god.stub_function(delay_task, 'abort') 1145 1146 self.god.stub_function(job, 'run') 1147 1148 self.god.stub_function(job, '_pending_count') 1149 self.god.stub_with(job, 'synch_count', 9) 1150 self.god.stub_function(job, 'request_abort') 1151 1152 # Test that the DelayedCallTask's callback queued up above does the 1153 # correct thing and does not call run if there are not enough hosts 1154 # in pending after the delay. 1155 job._pending_count.expect_call().and_return(0) 1156 job.request_abort.expect_call() 1157 delay_task._callback() 1158 self.god.check_playback() 1159 1160 # Test that the DelayedCallTask's callback queued up above does the 1161 # correct thing and returns the Agent returned by job.run() if 1162 # there are still enough hosts pending after the delay. 1163 job.synch_count = 4 1164 job._pending_count.expect_call().and_return(4) 1165 job.run.expect_call(hqe) 1166 delay_task._callback() 1167 self.god.check_playback() 1168 1169 job._pending_count.expect_call().and_return(4) 1170 1171 # Adjust the delay deadline so that enough time has passed. 1172 job._delay_ready_task.end_time = time.time() - 111111 1173 job.run.expect_call(hqe) 1174 # ...the delay_expired condition should cause us to call run() 1175 self._dispatcher._handle_agents() 1176 self.god.check_playback() 1177 delay_task.success = False 1178 1179 # Adjust the delay deadline back so that enough time has not passed. 1180 job._delay_ready_task.end_time = time.time() + 111111 1181 self._dispatcher._handle_agents() 1182 self.god.check_playback() 1183 1184 # Now max_number_of_machines HQEs are in pending state. Remaining 1185 # delay will now be ignored. 1186 other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id) 1187 self.god.unstub(job, 'run') 1188 self.god.unstub(job, '_pending_count') 1189 self.god.unstub(job, 'synch_count') 1190 self.god.unstub(job, 'request_abort') 1191 # ...the over_max_threshold test should cause us to call run() 1192 delay_task.abort.expect_call() 1193 other_hqe.on_pending() 1194 self.assertEquals('Starting', other_hqe.status) 1195 self.assertEquals('Starting', hqe.status) 1196 self.god.stub_function(job, 'run') 1197 self.god.unstub(delay_task, 'abort') 1198 1199 hqe.set_status('Pending') 1200 other_hqe.set_status('Pending') 1201 # Now we're not over the max for the atomic group. But all assigned 1202 # hosts are in pending state. over_max_threshold should make us run(). 1203 hqe.atomic_group.max_number_of_machines += 1 1204 hqe.atomic_group.save() 1205 job.run.expect_call(hqe) 1206 hqe.on_pending() 1207 self.god.check_playback() 1208 hqe.atomic_group.max_number_of_machines -= 1 1209 hqe.atomic_group.save() 1210 1211 other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id) 1212 self.assertTrue(hqe.job is other_hqe.job) 1213 # DBObject classes should reuse instances so these should be the same. 1214 self.assertEqual(job, other_hqe.job) 1215 self.assertEqual(other_hqe.job, hqe.job) 1216 # Be sure our delay was not lost during the other_hqe construction. 1217 self.assertEqual(job._delay_ready_task, delay_task) 1218 self.assert_(job._delay_ready_task) 1219 self.assertFalse(job._delay_ready_task.is_done()) 1220 self.assertFalse(job._delay_ready_task.aborted) 1221 1222 # We want the real run() to be called below. 1223 self.god.unstub(job, 'run') 1224 1225 # We pass in the other HQE this time the same way it would happen 1226 # for real when one host finishes verifying and enters pending. 1227 job.run_if_ready(other_hqe) 1228 1229 # The delayed task must be aborted by the actual run() call above. 1230 self.assertTrue(job._delay_ready_task.aborted) 1231 self.assertFalse(job._delay_ready_task.success) 1232 self.assertTrue(job._delay_ready_task.is_done()) 1233 1234 # Check that job run() and _finish_run() were called by the above: 1235 self._dispatcher._schedule_running_host_queue_entries() 1236 agent = self._dispatcher._agents[0] 1237 self.assert_(agent.task) 1238 task = agent.task 1239 self.assert_(isinstance(task, monitor_db.QueueTask)) 1240 # Requery these hqes in order to verify the status from the DB. 1241 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) 1242 for entry in django_hqes: 1243 self.assertEqual(models.HostQueueEntry.Status.STARTING, 1244 entry.status) 1245 1246 # We're already running, but more calls to run_with_ready_delay can 1247 # continue to come in due to straggler hosts enter Pending. Make 1248 # sure we don't do anything. 1249 self.god.stub_function(job, 'run') 1250 job.run_with_ready_delay(hqe) 1251 self.god.check_playback() 1252 self.god.unstub(job, 'run') 1253 1254 1255 def test_run_synchronous_atomic_group_ready(self): 1256 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 1257 self._update_hqe("status='Pending', execution_subdir=''") 1258 1259 queue_task = self._test_run_helper(expect_starting=True) 1260 1261 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1262 # Atomic group jobs that do not depend on a specific label in the 1263 # atomic group will use the atomic group name as their group name. 1264 self.assertEquals(queue_task.queue_entries[0].get_group_name(), 1265 'atomic1') 1266 1267 1268 def test_run_synchronous_atomic_group_with_label_ready(self): 1269 job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 1270 job.dependency_labels.add(self.label4) 1271 self._update_hqe("status='Pending', execution_subdir=''") 1272 1273 queue_task = self._test_run_helper(expect_starting=True) 1274 1275 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1276 # Atomic group jobs that also specify a label in the atomic group 1277 # will use the label name as their group name. 1278 self.assertEquals(queue_task.queue_entries[0].get_group_name(), 1279 'label4') 1280 1281 1282 def test_run_synchronous_ready(self): 1283 self._create_job(hosts=[1, 2], synchronous=True) 1284 self._update_hqe("status='Pending', execution_subdir=''") 1285 1286 queue_task = self._test_run_helper(expect_starting=True) 1287 1288 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1289 self.assertEquals(queue_task.job.id, 1) 1290 hqe_ids = [hqe.id for hqe in queue_task.queue_entries] 1291 self.assertEquals(hqe_ids, [1, 2]) 1292 1293 1294 def test_schedule_running_host_queue_entries_fail(self): 1295 self._create_job(hosts=[2]) 1296 self._update_hqe("status='%s', execution_subdir=''" % 1297 models.HostQueueEntry.Status.PENDING) 1298 job = scheduler_models.Job.fetch('id = 1')[0] 1299 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0] 1300 assert queue_entry.job is job 1301 job.run_if_ready(queue_entry) 1302 self.assertEqual(queue_entry.status, 1303 models.HostQueueEntry.Status.STARTING) 1304 self.assert_(queue_entry.execution_subdir) 1305 self.god.check_playback() 1306 1307 class dummy_test_agent(object): 1308 task = 'dummy_test_agent' 1309 self._dispatcher._register_agent_for_ids( 1310 self._dispatcher._host_agents, [queue_entry.host.id], 1311 dummy_test_agent) 1312 1313 # Attempted to schedule on a host that already has an agent. 1314 self.assertRaises(host_scheduler.SchedulerError, 1315 self._dispatcher._schedule_running_host_queue_entries) 1316 1317 1318 def test_schedule_hostless_job(self): 1319 job = self._create_job(hostless=True) 1320 self.assertEqual(1, job.hostqueueentry_set.count()) 1321 hqe_query = scheduler_models.HostQueueEntry.fetch( 1322 'id = %s' % job.hostqueueentry_set.all()[0].id) 1323 self.assertEqual(1, len(hqe_query)) 1324 hqe = hqe_query[0] 1325 1326 self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status) 1327 self.assertEqual(0, len(self._dispatcher._agents)) 1328 1329 self._dispatcher._schedule_new_jobs() 1330 1331 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 1332 self.assertEqual(1, len(self._dispatcher._agents)) 1333 1334 self._dispatcher._schedule_new_jobs() 1335 1336 # No change to previously schedule hostless job, and no additional agent 1337 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status) 1338 self.assertEqual(1, len(self._dispatcher._agents)) 1339 1340 1341class TopLevelFunctionsTest(unittest.TestCase): 1342 def setUp(self): 1343 self.god = mock.mock_god() 1344 1345 1346 def tearDown(self): 1347 self.god.unstub_all() 1348 1349 1350 def test_autoserv_command_line(self): 1351 machines = 'abcd12,efgh34' 1352 extra_args = ['-Z', 'hello'] 1353 expected_command_line_base = set((monitor_db._autoserv_path, '-p', 1354 '-m', machines, '-r', 1355 drone_manager.WORKING_DIRECTORY)) 1356 1357 expected_command_line = expected_command_line_base.union( 1358 ['--verbose']).union(extra_args) 1359 command_line = set( 1360 monitor_db._autoserv_command_line(machines, extra_args)) 1361 self.assertEqual(expected_command_line, command_line) 1362 1363 class FakeJob(object): 1364 owner = 'Bob' 1365 name = 'fake job name' 1366 test_retry = 0 1367 id = 1337 1368 1369 class FakeHQE(object): 1370 job = FakeJob 1371 1372 expected_command_line = expected_command_line_base.union( 1373 ['-u', FakeJob.owner, '-l', FakeJob.name]) 1374 command_line = set(monitor_db._autoserv_command_line( 1375 machines, extra_args=[], queue_entry=FakeHQE, verbose=False)) 1376 self.assertEqual(expected_command_line, command_line) 1377 1378 1379class AgentTaskTest(unittest.TestCase, 1380 frontend_test_utils.FrontendTestMixin): 1381 def setUp(self): 1382 self._frontend_common_setup() 1383 1384 1385 def tearDown(self): 1386 self._frontend_common_teardown() 1387 1388 1389 def _setup_drones(self): 1390 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 1391 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 1392 1393 drones = [] 1394 for x in xrange(4): 1395 drones.append(models.Drone.objects.create(hostname=str(x))) 1396 1397 drone_set_1 = models.DroneSet.objects.create(name='1') 1398 drone_set_1.drones.add(*drones[0:2]) 1399 drone_set_2 = models.DroneSet.objects.create(name='2') 1400 drone_set_2.drones.add(*drones[2:4]) 1401 drone_set_3 = models.DroneSet.objects.create(name='3') 1402 1403 job_1 = self._create_job_simple([self.hosts[0].id], 1404 drone_set=drone_set_1) 1405 job_2 = self._create_job_simple([self.hosts[0].id], 1406 drone_set=drone_set_2) 1407 job_3 = self._create_job_simple([self.hosts[0].id], 1408 drone_set=drone_set_3) 1409 1410 job_4 = self._create_job_simple([self.hosts[0].id]) 1411 job_4.drone_set = None 1412 job_4.save() 1413 1414 hqe_1 = job_1.hostqueueentry_set.all()[0] 1415 hqe_2 = job_2.hostqueueentry_set.all()[0] 1416 hqe_3 = job_3.hostqueueentry_set.all()[0] 1417 hqe_4 = job_4.hostqueueentry_set.all()[0] 1418 1419 return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask() 1420 1421 1422 def test_get_drone_hostnames_allowed_no_drones_in_set(self): 1423 hqes, task = self._setup_drones() 1424 task.queue_entry_ids = (hqes[2].id,) 1425 self.assertEqual(set(), task.get_drone_hostnames_allowed()) 1426 self.god.check_playback() 1427 1428 1429 def test_get_drone_hostnames_allowed_no_drone_set(self): 1430 hqes, task = self._setup_drones() 1431 hqe = hqes[3] 1432 task.queue_entry_ids = (hqe.id,) 1433 1434 result = object() 1435 1436 self.god.stub_function(task, '_user_or_global_default_drone_set') 1437 task._user_or_global_default_drone_set.expect_call( 1438 hqe.job, hqe.job.user()).and_return(result) 1439 1440 self.assertEqual(result, task.get_drone_hostnames_allowed()) 1441 self.god.check_playback() 1442 1443 1444 def test_get_drone_hostnames_allowed_success(self): 1445 hqes, task = self._setup_drones() 1446 task.queue_entry_ids = (hqes[0].id,) 1447 self.assertEqual(set(('0','1')), task.get_drone_hostnames_allowed()) 1448 self.god.check_playback() 1449 1450 1451 def test_get_drone_hostnames_allowed_multiple_jobs(self): 1452 hqes, task = self._setup_drones() 1453 task.queue_entry_ids = (hqes[0].id, hqes[1].id) 1454 self.assertRaises(AssertionError, 1455 task.get_drone_hostnames_allowed) 1456 self.god.check_playback() 1457 1458 1459 def test_get_drone_hostnames_allowed_no_hqe(self): 1460 class MockSpecialTask(object): 1461 requested_by = object() 1462 1463 class MockSpecialAgentTask(agent_task.SpecialAgentTask): 1464 task = MockSpecialTask() 1465 queue_entry_ids = [] 1466 def __init__(self, *args, **kwargs): 1467 pass 1468 1469 task = MockSpecialAgentTask() 1470 self.god.stub_function(models.DroneSet, 'drone_sets_enabled') 1471 self.god.stub_function(task, '_user_or_global_default_drone_set') 1472 1473 result = object() 1474 models.DroneSet.drone_sets_enabled.expect_call().and_return(True) 1475 task._user_or_global_default_drone_set.expect_call( 1476 task.task, MockSpecialTask.requested_by).and_return(result) 1477 1478 self.assertEqual(result, task.get_drone_hostnames_allowed()) 1479 self.god.check_playback() 1480 1481 1482 def _setup_test_user_or_global_default_drone_set(self): 1483 result = object() 1484 class MockDroneSet(object): 1485 def get_drone_hostnames(self): 1486 return result 1487 1488 self.god.stub_function(models.DroneSet, 'get_default') 1489 models.DroneSet.get_default.expect_call().and_return(MockDroneSet()) 1490 return result 1491 1492 1493 def test_user_or_global_default_drone_set(self): 1494 expected = object() 1495 class MockDroneSet(object): 1496 def get_drone_hostnames(self): 1497 return expected 1498 class MockUser(object): 1499 drone_set = MockDroneSet() 1500 1501 self._setup_test_user_or_global_default_drone_set() 1502 1503 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1504 None, MockUser()) 1505 1506 self.assertEqual(expected, actual) 1507 self.god.check_playback() 1508 1509 1510 def test_user_or_global_default_drone_set_no_user(self): 1511 expected = self._setup_test_user_or_global_default_drone_set() 1512 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1513 None, None) 1514 1515 self.assertEqual(expected, actual) 1516 self.god.check_playback() 1517 1518 1519 def test_user_or_global_default_drone_set_no_user_drone_set(self): 1520 class MockUser(object): 1521 drone_set = None 1522 login = None 1523 1524 expected = self._setup_test_user_or_global_default_drone_set() 1525 actual = agent_task.AgentTask()._user_or_global_default_drone_set( 1526 None, MockUser()) 1527 1528 self.assertEqual(expected, actual) 1529 self.god.check_playback() 1530 1531 1532if __name__ == '__main__': 1533 unittest.main() 1534