monitor_db_unittest.py revision a5288b4bb2b09aafe914d0b7d5aab79a7e433eaf
1#!/usr/bin/python 2 3import unittest, time, subprocess, os, StringIO, tempfile, datetime, shutil 4import logging 5import common 6import MySQLdb 7from autotest_lib.frontend import setup_django_environment 8from autotest_lib.frontend.afe import frontend_test_utils 9from autotest_lib.client.common_lib import global_config, host_protections 10from autotest_lib.client.common_lib.test_utils import mock 11from autotest_lib.database import database_connection, migrate 12from autotest_lib.frontend import thread_local 13from autotest_lib.frontend.afe import models 14from autotest_lib.scheduler import monitor_db, drone_manager, email_manager 15from autotest_lib.scheduler import scheduler_config 16 17_DEBUG = False 18 19 20class DummyAgent(object): 21 _is_running = False 22 _is_done = False 23 num_processes = 1 24 host_ids = [] 25 queue_entry_ids = [] 26 27 def is_running(self): 28 return self._is_running 29 30 31 def tick(self): 32 self._is_running = True 33 34 35 def is_done(self): 36 return self._is_done 37 38 39 def set_done(self, done): 40 self._is_done = done 41 self._is_running = not done 42 43 44class IsRow(mock.argument_comparator): 45 def __init__(self, row_id): 46 self.row_id = row_id 47 48 49 def is_satisfied_by(self, parameter): 50 return list(parameter)[0] == self.row_id 51 52 53 def __str__(self): 54 return 'row with id %s' % self.row_id 55 56 57class IsAgentWithTask(mock.argument_comparator): 58 def __init__(self, task): 59 self._task = task 60 61 62 def is_satisfied_by(self, parameter): 63 if not isinstance(parameter, monitor_db.Agent): 64 return False 65 tasks = list(parameter.queue.queue) 66 if len(tasks) != 1: 67 return False 68 return tasks[0] == self._task 69 70 71def _set_host_and_qe_ids(agent_or_task, id_list=None): 72 if id_list is None: 73 id_list = [] 74 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list 75 76 77class BaseSchedulerTest(unittest.TestCase, 78 frontend_test_utils.FrontendTestMixin): 79 _config_section = 'AUTOTEST_WEB' 80 81 def _do_query(self, sql): 82 self._database.execute(sql) 83 84 85 def _set_monitor_stubs(self): 86 # Clear the instance cache as this is a brand new database. 87 monitor_db.DBObject._clear_instance_cache() 88 89 self._database = ( 90 database_connection.DatabaseConnection.get_test_database( 91 self._test_db_file)) 92 self._database.connect() 93 self._database.debug = _DEBUG 94 95 monitor_db._db = self._database 96 monitor_db._drone_manager._results_dir = '/test/path' 97 monitor_db._drone_manager._temporary_directory = '/test/path/tmp' 98 99 100 def setUp(self): 101 self._frontend_common_setup() 102 self._set_monitor_stubs() 103 self._dispatcher = monitor_db.Dispatcher() 104 105 106 def tearDown(self): 107 self._database.disconnect() 108 self._frontend_common_teardown() 109 110 111 def _update_hqe(self, set, where=''): 112 query = 'UPDATE host_queue_entries SET ' + set 113 if where: 114 query += ' WHERE ' + where 115 self._do_query(query) 116 117 118class DBObjectTest(BaseSchedulerTest): 119 # It may seem odd to subclass BaseSchedulerTest for this but it saves us 120 # duplicating some setup work for what we want to test. 121 122 123 def test_compare_fields_in_row(self): 124 host = monitor_db.Host(id=1) 125 fields = list(host._fields) 126 row_data = [getattr(host, fieldname) for fieldname in fields] 127 self.assertEqual({}, host._compare_fields_in_row(row_data)) 128 row_data[fields.index('hostname')] = 'spam' 129 self.assertEqual({'hostname': ('host1', 'spam')}, 130 host._compare_fields_in_row(row_data)) 131 row_data[fields.index('id')] = 23 132 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)}, 133 host._compare_fields_in_row(row_data)) 134 135 136 def test_always_query(self): 137 host_a = monitor_db.Host(id=2) 138 self.assertEqual(host_a.hostname, 'host2') 139 self._do_query('UPDATE hosts SET hostname="host2-updated" WHERE id=2') 140 host_b = monitor_db.Host(id=2, always_query=True) 141 self.assert_(host_a is host_b, 'Cached instance not returned.') 142 self.assertEqual(host_a.hostname, 'host2-updated', 143 'Database was not re-queried') 144 145 # If either of these are called, a query was made when it shouldn't be. 146 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!') 147 host_a._update_fields_from_row = host_a._compare_fields_in_row 148 host_c = monitor_db.Host(id=2, always_query=False) 149 self.assert_(host_a is host_c, 'Cached instance not returned') 150 151 152 def test_delete(self): 153 host = monitor_db.Host(id=3) 154 host.delete() 155 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3, 156 always_query=False) 157 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3, 158 always_query=True) 159 160 def test_save(self): 161 # Dummy Job to avoid creating a one in the HostQueueEntry __init__. 162 class MockJob(object): 163 def __init__(self, id): 164 pass 165 def tag(self): 166 return 'MockJob' 167 self.god.stub_with(monitor_db, 'Job', MockJob) 168 hqe = monitor_db.HostQueueEntry( 169 new_record=True, 170 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None]) 171 hqe.save() 172 new_id = hqe.id 173 # Force a re-query and verify that the correct data was stored. 174 monitor_db.DBObject._clear_instance_cache() 175 hqe = monitor_db.HostQueueEntry(id=new_id) 176 self.assertEqual(hqe.id, new_id) 177 self.assertEqual(hqe.job_id, 1) 178 self.assertEqual(hqe.host_id, 2) 179 self.assertEqual(hqe.status, 'Queued') 180 self.assertEqual(hqe.meta_host, None) 181 self.assertEqual(hqe.active, False) 182 self.assertEqual(hqe.complete, False) 183 self.assertEqual(hqe.deleted, False) 184 self.assertEqual(hqe.execution_subdir, '.') 185 self.assertEqual(hqe.atomic_group_id, None) 186 self.assertEqual(hqe.started_on, None) 187 188 189class DispatcherSchedulingTest(BaseSchedulerTest): 190 _jobs_scheduled = [] 191 192 193 def tearDown(self): 194 super(DispatcherSchedulingTest, self).tearDown() 195 196 197 def _set_monitor_stubs(self): 198 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 199 200 def hqe__do_run_pre_job_tasks_stub(queue_entry): 201 """Return a test dummy. Called by HostQueueEntry.run().""" 202 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id) 203 queue_entry.set_status('Starting') 204 return DummyAgent() 205 206 self.god.stub_with(monitor_db.HostQueueEntry, '_do_run_pre_job_tasks', 207 hqe__do_run_pre_job_tasks_stub) 208 209 def hqe_queue_log_record_stub(self, log_line): 210 """No-Op to avoid calls down to the _drone_manager during tests.""" 211 212 self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record', 213 hqe_queue_log_record_stub) 214 215 216 def _record_job_scheduled(self, job_id, host_id): 217 record = (job_id, host_id) 218 self.assert_(record not in self._jobs_scheduled, 219 'Job %d scheduled on host %d twice' % 220 (job_id, host_id)) 221 self._jobs_scheduled.append(record) 222 223 224 def _assert_job_scheduled_on(self, job_id, host_id): 225 record = (job_id, host_id) 226 self.assert_(record in self._jobs_scheduled, 227 'Job %d not scheduled on host %d as expected\n' 228 'Jobs scheduled: %s' % 229 (job_id, host_id, self._jobs_scheduled)) 230 self._jobs_scheduled.remove(record) 231 232 233 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): 234 """Assert job was scheduled on exactly number hosts out of a set.""" 235 found = [] 236 for host_id in host_ids: 237 record = (job_id, host_id) 238 if record in self._jobs_scheduled: 239 found.append(record) 240 self._jobs_scheduled.remove(record) 241 if len(found) < number: 242 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' 243 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 244 elif len(found) > number: 245 self.fail('Job %d scheduled on more than %d hosts in %s.\n' 246 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 247 248 249 def _check_for_extra_schedulings(self): 250 if len(self._jobs_scheduled) != 0: 251 self.fail('Extra jobs scheduled: ' + 252 str(self._jobs_scheduled)) 253 254 255 def _convert_jobs_to_metahosts(self, *job_ids): 256 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 257 self._do_query('UPDATE host_queue_entries SET ' 258 'meta_host=host_id, host_id=NULL ' 259 'WHERE job_id IN ' + sql_tuple) 260 261 262 def _lock_host(self, host_id): 263 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 264 str(host_id)) 265 266 267 def setUp(self): 268 super(DispatcherSchedulingTest, self).setUp() 269 self._jobs_scheduled = [] 270 271 272 def _test_basic_scheduling_helper(self, use_metahosts): 273 'Basic nonmetahost scheduling' 274 self._create_job_simple([1], use_metahosts) 275 self._create_job_simple([2], use_metahosts) 276 self._dispatcher._schedule_new_jobs() 277 self._assert_job_scheduled_on(1, 1) 278 self._assert_job_scheduled_on(2, 2) 279 self._check_for_extra_schedulings() 280 281 282 def _test_priorities_helper(self, use_metahosts): 283 'Test prioritization ordering' 284 self._create_job_simple([1], use_metahosts) 285 self._create_job_simple([2], use_metahosts) 286 self._create_job_simple([1,2], use_metahosts) 287 self._create_job_simple([1], use_metahosts, priority=1) 288 self._dispatcher._schedule_new_jobs() 289 self._assert_job_scheduled_on(4, 1) # higher priority 290 self._assert_job_scheduled_on(2, 2) # earlier job over later 291 self._check_for_extra_schedulings() 292 293 294 def _test_hosts_ready_helper(self, use_metahosts): 295 """ 296 Only hosts that are status=Ready, unlocked and not invalid get 297 scheduled. 298 """ 299 self._create_job_simple([1], use_metahosts) 300 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 301 self._dispatcher._schedule_new_jobs() 302 self._check_for_extra_schedulings() 303 304 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 305 'WHERE id=1') 306 self._dispatcher._schedule_new_jobs() 307 self._check_for_extra_schedulings() 308 309 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 310 'WHERE id=1') 311 self._dispatcher._schedule_new_jobs() 312 if not use_metahosts: 313 self._assert_job_scheduled_on(1, 1) 314 self._check_for_extra_schedulings() 315 316 317 def _test_hosts_idle_helper(self, use_metahosts): 318 'Only idle hosts get scheduled' 319 self._create_job(hosts=[1], active=True) 320 self._create_job_simple([1], use_metahosts) 321 self._dispatcher._schedule_new_jobs() 322 self._check_for_extra_schedulings() 323 324 325 def _test_obey_ACLs_helper(self, use_metahosts): 326 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 327 self._create_job_simple([1], use_metahosts) 328 self._dispatcher._schedule_new_jobs() 329 self._check_for_extra_schedulings() 330 331 332 def test_basic_scheduling(self): 333 self._test_basic_scheduling_helper(False) 334 335 336 def test_priorities(self): 337 self._test_priorities_helper(False) 338 339 340 def test_hosts_ready(self): 341 self._test_hosts_ready_helper(False) 342 343 344 def test_hosts_idle(self): 345 self._test_hosts_idle_helper(False) 346 347 348 def test_obey_ACLs(self): 349 self._test_obey_ACLs_helper(False) 350 351 352 def test_one_time_hosts_ignore_ACLs(self): 353 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 354 self._do_query('UPDATE hosts SET invalid=1 WHERE id=1') 355 self._create_job_simple([1]) 356 self._dispatcher._schedule_new_jobs() 357 self._assert_job_scheduled_on(1, 1) 358 self._check_for_extra_schedulings() 359 360 361 def test_non_metahost_on_invalid_host(self): 362 """ 363 Non-metahost entries can get scheduled on invalid hosts (this is how 364 one-time hosts work). 365 """ 366 self._do_query('UPDATE hosts SET invalid=1') 367 self._test_basic_scheduling_helper(False) 368 369 370 def test_metahost_scheduling(self): 371 """ 372 Basic metahost scheduling 373 """ 374 self._test_basic_scheduling_helper(True) 375 376 377 def test_metahost_priorities(self): 378 self._test_priorities_helper(True) 379 380 381 def test_metahost_hosts_ready(self): 382 self._test_hosts_ready_helper(True) 383 384 385 def test_metahost_hosts_idle(self): 386 self._test_hosts_idle_helper(True) 387 388 389 def test_metahost_obey_ACLs(self): 390 self._test_obey_ACLs_helper(True) 391 392 393 def _setup_test_only_if_needed_labels(self): 394 # apply only_if_needed label3 to host1 395 models.Host.smart_get('host1').labels.add(self.label3) 396 return self._create_job_simple([1], use_metahost=True) 397 398 399 def test_only_if_needed_labels_avoids_host(self): 400 job = self._setup_test_only_if_needed_labels() 401 # if the job doesn't depend on label3, there should be no scheduling 402 self._dispatcher._schedule_new_jobs() 403 self._check_for_extra_schedulings() 404 405 406 def test_only_if_needed_labels_schedules(self): 407 job = self._setup_test_only_if_needed_labels() 408 job.dependency_labels.add(self.label3) 409 self._dispatcher._schedule_new_jobs() 410 self._assert_job_scheduled_on(1, 1) 411 self._check_for_extra_schedulings() 412 413 414 def test_only_if_needed_labels_via_metahost(self): 415 job = self._setup_test_only_if_needed_labels() 416 job.dependency_labels.add(self.label3) 417 # should also work if the metahost is the only_if_needed label 418 self._do_query('DELETE FROM jobs_dependency_labels') 419 self._create_job(metahosts=[3]) 420 self._dispatcher._schedule_new_jobs() 421 self._assert_job_scheduled_on(2, 1) 422 self._check_for_extra_schedulings() 423 424 425 def test_nonmetahost_over_metahost(self): 426 """ 427 Non-metahost entries should take priority over metahost entries 428 for the same host 429 """ 430 self._create_job(metahosts=[1]) 431 self._create_job(hosts=[1]) 432 self._dispatcher._schedule_new_jobs() 433 self._assert_job_scheduled_on(2, 1) 434 self._check_for_extra_schedulings() 435 436 437 def test_metahosts_obey_blocks(self): 438 """ 439 Metahosts can't get scheduled on hosts already scheduled for 440 that job. 441 """ 442 self._create_job(metahosts=[1], hosts=[1]) 443 # make the nonmetahost entry complete, so the metahost can try 444 # to get scheduled 445 self._update_hqe(set='complete = 1', where='host_id=1') 446 self._dispatcher._schedule_new_jobs() 447 self._check_for_extra_schedulings() 448 449 450 # TODO(gps): These should probably live in their own TestCase class 451 # specific to testing HostScheduler methods directly. It was convenient 452 # to put it here for now to share existing test environment setup code. 453 def test_HostScheduler_check_atomic_group_labels(self): 454 normal_job = self._create_job(metahosts=[0]) 455 atomic_job = self._create_job(atomic_group=1) 456 # Indirectly initialize the internal state of the host scheduler. 457 self._dispatcher._refresh_pending_queue_entries() 458 459 atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' % 460 atomic_job.id).next() 461 normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' % 462 normal_job.id).next() 463 464 host_scheduler = self._dispatcher._host_scheduler 465 self.assertTrue(host_scheduler._check_atomic_group_labels( 466 [self.label4.id], atomic_hqe)) 467 self.assertFalse(host_scheduler._check_atomic_group_labels( 468 [self.label4.id], normal_hqe)) 469 self.assertFalse(host_scheduler._check_atomic_group_labels( 470 [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) 471 self.assertTrue(host_scheduler._check_atomic_group_labels( 472 [self.label4.id, self.label6.id], atomic_hqe)) 473 self.assertTrue(host_scheduler._check_atomic_group_labels( 474 [self.label4.id, self.label5.id], 475 atomic_hqe)) 476 477 478 def test_HostScheduler_get_host_atomic_group_id(self): 479 job = self._create_job(metahosts=[self.label6.id]) 480 queue_entry = monitor_db.HostQueueEntry.fetch( 481 where='job_id=%d' % job.id).next() 482 # Indirectly initialize the internal state of the host scheduler. 483 self._dispatcher._refresh_pending_queue_entries() 484 485 # Test the host scheduler 486 host_scheduler = self._dispatcher._host_scheduler 487 488 # Two labels each in a different atomic group. This should log an 489 # error and continue. 490 orig_logging_error = logging.error 491 def mock_logging_error(message, *args): 492 mock_logging_error._num_calls += 1 493 # Test the logging call itself, we just wrapped it to count it. 494 orig_logging_error(message, *args) 495 mock_logging_error._num_calls = 0 496 self.god.stub_with(logging, 'error', mock_logging_error) 497 self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id( 498 [self.label4.id, self.label8.id], queue_entry)) 499 self.assertTrue(mock_logging_error._num_calls > 0) 500 self.god.unstub(logging, 'error') 501 502 # Two labels both in the same atomic group, this should not raise an 503 # error, it will merely cause the job to schedule on the intersection. 504 self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 505 [self.label4.id, self.label5.id])) 506 507 self.assertEquals(None, host_scheduler._get_host_atomic_group_id([])) 508 self.assertEquals(None, host_scheduler._get_host_atomic_group_id( 509 [self.label3.id, self.label7.id, self.label6.id])) 510 self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 511 [self.label4.id, self.label7.id, self.label6.id])) 512 self.assertEquals(1, host_scheduler._get_host_atomic_group_id( 513 [self.label7.id, self.label5.id])) 514 515 516 def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): 517 # Create a job scheduled to run on label6. 518 self._create_job(metahosts=[self.label6.id]) 519 self._dispatcher._schedule_new_jobs() 520 # label6 only has hosts that are in atomic groups associated with it, 521 # there should be no scheduling. 522 self._check_for_extra_schedulings() 523 524 525 def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): 526 # Create a job scheduled to run on label5. This is an atomic group 527 # label but this job does not request atomic group scheduling. 528 self._create_job(metahosts=[self.label5.id]) 529 self._dispatcher._schedule_new_jobs() 530 # label6 only has hosts that are in atomic groups associated with it, 531 # there should be no scheduling. 532 self._check_for_extra_schedulings() 533 534 535 def test_atomic_group_scheduling_basics(self): 536 # Create jobs scheduled to run on an atomic group. 537 job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], 538 atomic_group=1) 539 job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], 540 atomic_group=1) 541 self._dispatcher._schedule_new_jobs() 542 # atomic_group.max_number_of_machines was 2 so we should run on 2. 543 self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) 544 self._assert_job_scheduled_on(job_b.id, 8) # label5 545 self._assert_job_scheduled_on(job_b.id, 9) # label5 546 self._check_for_extra_schedulings() 547 548 # The three host label4 atomic group still has one host available. 549 # That means a job with a synch_count of 1 asking to be scheduled on 550 # the atomic group can still use the final machine. 551 # 552 # This may seem like a somewhat odd use case. It allows the use of an 553 # atomic group as a set of machines to run smaller jobs within (a set 554 # of hosts configured for use in network tests with eachother perhaps?) 555 onehost_job = self._create_job(atomic_group=1) 556 self._dispatcher._schedule_new_jobs() 557 self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) 558 self._check_for_extra_schedulings() 559 560 # No more atomic groups have hosts available, no more jobs should 561 # be scheduled. 562 self._create_job(atomic_group=1) 563 self._dispatcher._schedule_new_jobs() 564 self._check_for_extra_schedulings() 565 566 567 def test_atomic_group_scheduling_obeys_acls(self): 568 # Request scheduling on a specific atomic label but be denied by ACLs. 569 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)') 570 job = self._create_job(metahosts=[self.label5.id], atomic_group=1) 571 self._dispatcher._schedule_new_jobs() 572 self._check_for_extra_schedulings() 573 574 575 def test_atomic_group_scheduling_dependency_label_exclude(self): 576 # A dependency label that matches no hosts in the atomic group. 577 job_a = self._create_job(atomic_group=1) 578 job_a.dependency_labels.add(self.label3) 579 self._dispatcher._schedule_new_jobs() 580 self._check_for_extra_schedulings() 581 582 583 def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): 584 # A metahost and dependency label that excludes too many hosts. 585 job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], 586 atomic_group=1) 587 job_b.dependency_labels.add(self.label7) 588 self._dispatcher._schedule_new_jobs() 589 self._check_for_extra_schedulings() 590 591 592 def test_atomic_group_scheduling_dependency_label_match(self): 593 # A dependency label that exists on enough atomic group hosts in only 594 # one of the two atomic group labels. 595 job_c = self._create_job(synchronous=True, atomic_group=1) 596 job_c.dependency_labels.add(self.label7) 597 self._dispatcher._schedule_new_jobs() 598 self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) 599 self._check_for_extra_schedulings() 600 601 602 def test_atomic_group_scheduling_no_metahost(self): 603 # Force it to schedule on the other group for a reliable test. 604 self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') 605 # An atomic job without a metahost. 606 job = self._create_job(synchronous=True, atomic_group=1) 607 self._dispatcher._schedule_new_jobs() 608 self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) 609 self._check_for_extra_schedulings() 610 611 612 def test_atomic_group_scheduling_partial_group(self): 613 # Make one host in labels[3] unavailable so that there are only two 614 # hosts left in the group. 615 self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5') 616 job = self._create_job(synchronous=True, metahosts=[self.label4.id], 617 atomic_group=1) 618 self._dispatcher._schedule_new_jobs() 619 # Verify that it was scheduled on the 2 ready hosts in that group. 620 self._assert_job_scheduled_on(job.id, 6) 621 self._assert_job_scheduled_on(job.id, 7) 622 self._check_for_extra_schedulings() 623 624 625 def test_atomic_group_scheduling_not_enough_available(self): 626 # Mark some hosts in each atomic group label as not usable. 627 # One host running, another invalid in the first group label. 628 self._do_query('UPDATE hosts SET status="Running" WHERE id=5') 629 self._do_query('UPDATE hosts SET invalid=1 WHERE id=6') 630 # One host invalid in the second group label. 631 self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') 632 # Nothing to schedule when no group label has enough (2) good hosts.. 633 self._create_job(atomic_group=1, synchronous=True) 634 self._dispatcher._schedule_new_jobs() 635 # There are not enough hosts in either atomic group, 636 # No more scheduling should occur. 637 self._check_for_extra_schedulings() 638 639 # Now create an atomic job that has a synch count of 1. It should 640 # schedule on exactly one of the hosts. 641 onehost_job = self._create_job(atomic_group=1) 642 self._dispatcher._schedule_new_jobs() 643 self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) 644 645 646 def test_atomic_group_scheduling_no_valid_hosts(self): 647 self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)') 648 self._create_job(synchronous=True, metahosts=[self.label5.id], 649 atomic_group=1) 650 self._dispatcher._schedule_new_jobs() 651 # no hosts in the selected group and label are valid. no schedulings. 652 self._check_for_extra_schedulings() 653 654 655 def test_atomic_group_scheduling_metahost_works(self): 656 # Test that atomic group scheduling also obeys metahosts. 657 self._create_job(metahosts=[0], atomic_group=1) 658 self._dispatcher._schedule_new_jobs() 659 # There are no atomic group hosts that also have that metahost. 660 self._check_for_extra_schedulings() 661 662 job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) 663 self._dispatcher._schedule_new_jobs() 664 self._assert_job_scheduled_on(job_b.id, 8) 665 self._assert_job_scheduled_on(job_b.id, 9) 666 self._check_for_extra_schedulings() 667 668 669 def test_atomic_group_skips_ineligible_hosts(self): 670 # Test hosts marked ineligible for this job are not eligible. 671 # How would this ever happen anyways? 672 job = self._create_job(metahosts=[self.label4.id], atomic_group=1) 673 models.IneligibleHostQueue.objects.create(job=job, host_id=5) 674 models.IneligibleHostQueue.objects.create(job=job, host_id=6) 675 models.IneligibleHostQueue.objects.create(job=job, host_id=7) 676 self._dispatcher._schedule_new_jobs() 677 # No scheduling should occur as all desired hosts were ineligible. 678 self._check_for_extra_schedulings() 679 680 681 def test_atomic_group_scheduling_fail(self): 682 # If synch_count is > the atomic group number of machines, the job 683 # should be aborted immediately. 684 model_job = self._create_job(synchronous=True, atomic_group=1) 685 model_job.synch_count = 4 686 model_job.save() 687 job = monitor_db.Job(id=model_job.id) 688 self._dispatcher._schedule_new_jobs() 689 self._check_for_extra_schedulings() 690 queue_entries = job.get_host_queue_entries() 691 self.assertEqual(1, len(queue_entries)) 692 self.assertEqual(queue_entries[0].status, 693 models.HostQueueEntry.Status.ABORTED) 694 695 696 def test_atomic_group_no_labels_no_scheduling(self): 697 # Never schedule on atomic groups marked invalid. 698 job = self._create_job(metahosts=[self.label5.id], synchronous=True, 699 atomic_group=1) 700 # Deleting an atomic group via the frontend marks it invalid and 701 # removes all label references to the group. The job now references 702 # an invalid atomic group with no labels associated with it. 703 self.label5.atomic_group.invalid = True 704 self.label5.atomic_group.save() 705 self.label5.atomic_group = None 706 self.label5.save() 707 708 self._dispatcher._schedule_new_jobs() 709 self._check_for_extra_schedulings() 710 711 712 def test_schedule_directly_on_atomic_group_host_fail(self): 713 # Scheduling a job directly on hosts in an atomic group must 714 # fail to avoid users inadvertently holding up the use of an 715 # entire atomic group by using the machines individually. 716 job = self._create_job(hosts=[5]) 717 self._dispatcher._schedule_new_jobs() 718 self._check_for_extra_schedulings() 719 720 721 def test_schedule_directly_on_atomic_group_host(self): 722 # Scheduling a job directly on one host in an atomic group will 723 # work when the atomic group is listed on the HQE in addition 724 # to the host (assuming the sync count is 1). 725 job = self._create_job(hosts=[5], atomic_group=1) 726 self._dispatcher._schedule_new_jobs() 727 self._assert_job_scheduled_on(job.id, 5) 728 self._check_for_extra_schedulings() 729 730 731 def test_schedule_directly_on_atomic_group_hosts_sync2(self): 732 job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) 733 self._dispatcher._schedule_new_jobs() 734 self._assert_job_scheduled_on(job.id, 5) 735 self._assert_job_scheduled_on(job.id, 8) 736 self._check_for_extra_schedulings() 737 738 739 def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): 740 job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) 741 self._dispatcher._schedule_new_jobs() 742 self._check_for_extra_schedulings() 743 744 745 def test_only_schedule_queued_entries(self): 746 self._create_job(metahosts=[1]) 747 self._update_hqe(set='active=1, host_id=2') 748 self._dispatcher._schedule_new_jobs() 749 self._check_for_extra_schedulings() 750 751 752 def test_no_ready_hosts(self): 753 self._create_job(hosts=[1]) 754 self._do_query('UPDATE hosts SET status="Repair Failed"') 755 self._dispatcher._schedule_new_jobs() 756 self._check_for_extra_schedulings() 757 758 759class DispatcherThrottlingTest(BaseSchedulerTest): 760 """ 761 Test that the dispatcher throttles: 762 * total number of running processes 763 * number of processes started per cycle 764 """ 765 _MAX_RUNNING = 3 766 _MAX_STARTED = 2 767 768 def setUp(self): 769 super(DispatcherThrottlingTest, self).setUp() 770 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING 771 scheduler_config.config.max_processes_started_per_cycle = ( 772 self._MAX_STARTED) 773 774 def fake_max_runnable_processes(fake_self): 775 running = sum(agent.num_processes 776 for agent in self._agents 777 if agent.is_running()) 778 return self._MAX_RUNNING - running 779 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', 780 fake_max_runnable_processes) 781 782 783 def _setup_some_agents(self, num_agents): 784 self._agents = [DummyAgent() for i in xrange(num_agents)] 785 self._dispatcher._agents = list(self._agents) 786 787 788 def _run_a_few_cycles(self): 789 for i in xrange(4): 790 self._dispatcher._handle_agents() 791 792 793 def _assert_agents_started(self, indexes, is_started=True): 794 for i in indexes: 795 self.assert_(self._agents[i].is_running() == is_started, 796 'Agent %d %sstarted' % 797 (i, is_started and 'not ' or '')) 798 799 800 def _assert_agents_not_started(self, indexes): 801 self._assert_agents_started(indexes, False) 802 803 804 def test_throttle_total(self): 805 self._setup_some_agents(4) 806 self._run_a_few_cycles() 807 self._assert_agents_started([0, 1, 2]) 808 self._assert_agents_not_started([3]) 809 810 811 def test_throttle_per_cycle(self): 812 self._setup_some_agents(3) 813 self._dispatcher._handle_agents() 814 self._assert_agents_started([0, 1]) 815 self._assert_agents_not_started([2]) 816 817 818 def test_throttle_with_synchronous(self): 819 self._setup_some_agents(2) 820 self._agents[0].num_processes = 3 821 self._run_a_few_cycles() 822 self._assert_agents_started([0]) 823 self._assert_agents_not_started([1]) 824 825 826 def test_large_agent_starvation(self): 827 """ 828 Ensure large agents don't get starved by lower-priority agents. 829 """ 830 self._setup_some_agents(3) 831 self._agents[1].num_processes = 3 832 self._run_a_few_cycles() 833 self._assert_agents_started([0]) 834 self._assert_agents_not_started([1, 2]) 835 836 self._agents[0].set_done(True) 837 self._run_a_few_cycles() 838 self._assert_agents_started([1]) 839 self._assert_agents_not_started([2]) 840 841 842 def test_zero_process_agent(self): 843 self._setup_some_agents(5) 844 self._agents[4].num_processes = 0 845 self._run_a_few_cycles() 846 self._assert_agents_started([0, 1, 2, 4]) 847 self._assert_agents_not_started([3]) 848 849 850class FindAbortTest(BaseSchedulerTest): 851 """ 852 Test the dispatcher abort functionality. 853 """ 854 def _check_host_agent(self, agent, host_id): 855 self.assert_(isinstance(agent, monitor_db.Agent)) 856 tasks = list(agent.queue.queue) 857 self.assertEquals(len(tasks), 2) 858 cleanup, verify = tasks 859 860 self.assert_(isinstance(cleanup, monitor_db.CleanupTask)) 861 self.assertEquals(cleanup.host.id, host_id) 862 863 self.assert_(isinstance(verify, monitor_db.VerifyTask)) 864 self.assertEquals(verify.host.id, host_id) 865 866 867 def _check_agents(self, agents): 868 agents = list(agents) 869 self.assertEquals(len(agents), 3) 870 self.assertEquals(agents[0], self._agent) 871 self._check_host_agent(agents[1], 1) 872 self._check_host_agent(agents[2], 2) 873 874 875 def _common_setup(self): 876 self._create_job(hosts=[1, 2]) 877 self._update_hqe(set='aborted=1') 878 self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent') 879 _set_host_and_qe_ids(self._agent, [1, 2]) 880 self._agent.abort.expect_call() 881 self._agent.abort.expect_call() # gets called once for each HQE 882 self._dispatcher.add_agent(self._agent) 883 884 885 def test_find_aborting(self): 886 self._common_setup() 887 self._dispatcher._find_aborting() 888 self.god.check_playback() 889 890 891 def test_find_aborting_verifying(self): 892 self._common_setup() 893 self._update_hqe(set='active=1, status="Verifying"') 894 895 self._dispatcher._find_aborting() 896 897 self._check_agents(self._dispatcher._agents) 898 self.god.check_playback() 899 900 901class JobTimeoutTest(BaseSchedulerTest): 902 def _test_synch_start_timeout_helper(self, expect_abort, 903 set_created_on=True, set_active=True, 904 set_acl=True): 905 scheduler_config.config.synch_job_start_timeout_minutes = 60 906 job = self._create_job(hosts=[1, 2]) 907 if set_active: 908 hqe = job.hostqueueentry_set.filter(host__id=1)[0] 909 hqe.status = 'Pending' 910 hqe.active = 1 911 hqe.save() 912 913 everyone_acl = models.AclGroup.smart_get('Everyone') 914 host1 = models.Host.smart_get(1) 915 if set_acl: 916 everyone_acl.hosts.add(host1) 917 else: 918 everyone_acl.hosts.remove(host1) 919 920 job.created_on = datetime.datetime.now() 921 if set_created_on: 922 job.created_on -= datetime.timedelta(minutes=100) 923 job.save() 924 925 cleanup = self._dispatcher._periodic_cleanup 926 cleanup._abort_jobs_past_synch_start_timeout() 927 928 for hqe in job.hostqueueentry_set.all(): 929 self.assertEquals(hqe.aborted, expect_abort) 930 931 932 def test_synch_start_timeout_helper(self): 933 # no abort if any of the condition aren't met 934 self._test_synch_start_timeout_helper(False, set_created_on=False) 935 self._test_synch_start_timeout_helper(False, set_active=False) 936 self._test_synch_start_timeout_helper(False, set_acl=False) 937 # abort if all conditions are met 938 self._test_synch_start_timeout_helper(True) 939 940 941class PidfileRunMonitorTest(unittest.TestCase): 942 execution_tag = 'test_tag' 943 pid = 12345 944 process = drone_manager.Process('myhost', pid) 945 num_tests_failed = 1 946 947 def setUp(self): 948 self.god = mock.mock_god() 949 self.mock_drone_manager = self.god.create_mock_class( 950 drone_manager.DroneManager, 'drone_manager') 951 self.god.stub_with(monitor_db, '_drone_manager', 952 self.mock_drone_manager) 953 self.god.stub_function(email_manager.manager, 'enqueue_notify_email') 954 955 self.pidfile_id = object() 956 957 (self.mock_drone_manager.get_pidfile_id_from 958 .expect_call(self.execution_tag, 959 pidfile_name=monitor_db._AUTOSERV_PID_FILE) 960 .and_return(self.pidfile_id)) 961 self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id) 962 963 self.monitor = monitor_db.PidfileRunMonitor() 964 self.monitor.attach_to_existing_process(self.execution_tag) 965 966 967 def tearDown(self): 968 self.god.unstub_all() 969 970 971 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 972 use_second_read=False): 973 contents = drone_manager.PidfileContents() 974 if pid is not None: 975 contents.process = drone_manager.Process('myhost', pid) 976 contents.exit_status = exit_code 977 contents.num_tests_failed = tests_failed 978 self.mock_drone_manager.get_pidfile_contents.expect_call( 979 self.pidfile_id, use_second_read=use_second_read).and_return( 980 contents) 981 982 983 def set_not_yet_run(self): 984 self.setup_pidfile() 985 986 987 def set_empty_pidfile(self): 988 self.setup_pidfile() 989 990 991 def set_running(self, use_second_read=False): 992 self.setup_pidfile(self.pid, use_second_read=use_second_read) 993 994 995 def set_complete(self, error_code, use_second_read=False): 996 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 997 use_second_read=use_second_read) 998 999 1000 def _check_monitor(self, expected_pid, expected_exit_status, 1001 expected_num_tests_failed): 1002 if expected_pid is None: 1003 self.assertEquals(self.monitor._state.process, None) 1004 else: 1005 self.assertEquals(self.monitor._state.process.pid, expected_pid) 1006 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 1007 self.assertEquals(self.monitor._state.num_tests_failed, 1008 expected_num_tests_failed) 1009 1010 1011 self.god.check_playback() 1012 1013 1014 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 1015 expected_num_tests_failed): 1016 self.monitor._read_pidfile() 1017 self._check_monitor(expected_pid, expected_exit_status, 1018 expected_num_tests_failed) 1019 1020 1021 def _get_expected_tests_failed(self, expected_exit_status): 1022 if expected_exit_status is None: 1023 expected_tests_failed = None 1024 else: 1025 expected_tests_failed = self.num_tests_failed 1026 return expected_tests_failed 1027 1028 1029 def test_read_pidfile(self): 1030 self.set_not_yet_run() 1031 self._test_read_pidfile_helper(None, None, None) 1032 1033 self.set_empty_pidfile() 1034 self._test_read_pidfile_helper(None, None, None) 1035 1036 self.set_running() 1037 self._test_read_pidfile_helper(self.pid, None, None) 1038 1039 self.set_complete(123) 1040 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 1041 1042 1043 def test_read_pidfile_error(self): 1044 self.mock_drone_manager.get_pidfile_contents.expect_call( 1045 self.pidfile_id, use_second_read=False).and_return( 1046 drone_manager.InvalidPidfile('error')) 1047 self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException, 1048 self.monitor._read_pidfile) 1049 self.god.check_playback() 1050 1051 1052 def setup_is_running(self, is_running): 1053 self.mock_drone_manager.is_process_running.expect_call( 1054 self.process).and_return(is_running) 1055 1056 1057 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 1058 expected_num_tests_failed): 1059 self.monitor._get_pidfile_info() 1060 self._check_monitor(expected_pid, expected_exit_status, 1061 expected_num_tests_failed) 1062 1063 1064 def test_get_pidfile_info(self): 1065 """ 1066 normal cases for get_pidfile_info 1067 """ 1068 # running 1069 self.set_running() 1070 self.setup_is_running(True) 1071 self._test_get_pidfile_info_helper(self.pid, None, None) 1072 1073 # exited during check 1074 self.set_running() 1075 self.setup_is_running(False) 1076 self.set_complete(123, use_second_read=True) # pidfile gets read again 1077 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 1078 1079 # completed 1080 self.set_complete(123) 1081 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 1082 1083 1084 def test_get_pidfile_info_running_no_proc(self): 1085 """ 1086 pidfile shows process running, but no proc exists 1087 """ 1088 # running but no proc 1089 self.set_running() 1090 self.setup_is_running(False) 1091 self.set_running(use_second_read=True) 1092 email_manager.manager.enqueue_notify_email.expect_call( 1093 mock.is_string_comparator(), mock.is_string_comparator()) 1094 self._test_get_pidfile_info_helper(self.pid, 1, 0) 1095 self.assertTrue(self.monitor.lost_process) 1096 1097 1098 def test_get_pidfile_info_not_yet_run(self): 1099 """ 1100 pidfile hasn't been written yet 1101 """ 1102 self.set_not_yet_run() 1103 self._test_get_pidfile_info_helper(None, None, None) 1104 1105 1106 def test_process_failed_to_write_pidfile(self): 1107 self.set_not_yet_run() 1108 email_manager.manager.enqueue_notify_email.expect_call( 1109 mock.is_string_comparator(), mock.is_string_comparator()) 1110 self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1 1111 self._test_get_pidfile_info_helper(None, 1, 0) 1112 self.assertTrue(self.monitor.lost_process) 1113 1114 1115class AgentTest(unittest.TestCase): 1116 def setUp(self): 1117 self.god = mock.mock_god() 1118 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 1119 'dispatcher') 1120 1121 1122 def tearDown(self): 1123 self.god.unstub_all() 1124 1125 1126 def _create_mock_task(self, name): 1127 task = self.god.create_mock_class(monitor_db.AgentTask, name) 1128 _set_host_and_qe_ids(task) 1129 return task 1130 1131 def _create_agent(self, tasks): 1132 agent = monitor_db.Agent(tasks) 1133 agent.dispatcher = self._dispatcher 1134 return agent 1135 1136 1137 def _finish_agent(self, agent): 1138 while not agent.is_done(): 1139 agent.tick() 1140 1141 1142 def test_agent(self): 1143 task1 = self._create_mock_task('task1') 1144 task2 = self._create_mock_task('task2') 1145 task3 = self._create_mock_task('task3') 1146 task1.poll.expect_call() 1147 task1.is_done.expect_call().and_return(False) 1148 task1.poll.expect_call() 1149 task1.is_done.expect_call().and_return(True) 1150 task1.is_done.expect_call().and_return(True) 1151 task1.success = True 1152 1153 task2.poll.expect_call() 1154 task2.is_done.expect_call().and_return(True) 1155 task2.is_done.expect_call().and_return(True) 1156 task2.success = False 1157 task2.failure_tasks = [task3] 1158 1159 self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3)) 1160 1161 agent = self._create_agent([task1, task2]) 1162 self._finish_agent(agent) 1163 self.god.check_playback() 1164 1165 1166 def _test_agent_abort_helper(self, ignore_abort=False): 1167 task1 = self._create_mock_task('task1') 1168 task2 = self._create_mock_task('task2') 1169 task1.poll.expect_call() 1170 task1.is_done.expect_call().and_return(False) 1171 task1.abort.expect_call() 1172 if ignore_abort: 1173 task1.aborted = False # task ignores abort; execution continues 1174 1175 task1.poll.expect_call() 1176 task1.is_done.expect_call().and_return(True) 1177 task1.is_done.expect_call().and_return(True) 1178 task1.success = True 1179 1180 task2.poll.expect_call() 1181 task2.is_done.expect_call().and_return(True) 1182 task2.is_done.expect_call().and_return(True) 1183 task2.success = True 1184 else: 1185 task1.aborted = True 1186 task2.abort.expect_call() 1187 task2.aborted = True 1188 1189 agent = self._create_agent([task1, task2]) 1190 agent.tick() 1191 agent.abort() 1192 self._finish_agent(agent) 1193 self.god.check_playback() 1194 1195 1196 def test_agent_abort(self): 1197 self._test_agent_abort_helper() 1198 self._test_agent_abort_helper(True) 1199 1200 1201 def _test_agent_abort_before_started_helper(self, ignore_abort=False): 1202 task = self._create_mock_task('task') 1203 task.abort.expect_call() 1204 if ignore_abort: 1205 task.aborted = False 1206 task.poll.expect_call() 1207 task.is_done.expect_call().and_return(True) 1208 task.is_done.expect_call().and_return(True) 1209 task.success = True 1210 else: 1211 task.aborted = True 1212 1213 agent = self._create_agent([task]) 1214 agent.abort() 1215 self._finish_agent(agent) 1216 self.god.check_playback() 1217 1218 1219 def test_agent_abort_before_started(self): 1220 self._test_agent_abort_before_started_helper() 1221 self._test_agent_abort_before_started_helper(True) 1222 1223 1224class DelayedCallTaskTest(unittest.TestCase): 1225 def setUp(self): 1226 self.god = mock.mock_god() 1227 1228 1229 def tearDown(self): 1230 self.god.unstub_all() 1231 1232 1233 def test_delayed_call(self): 1234 test_time = self.god.create_mock_function('time') 1235 test_time.expect_call().and_return(33) 1236 test_time.expect_call().and_return(34.01) 1237 test_time.expect_call().and_return(34.99) 1238 test_time.expect_call().and_return(35.01) 1239 def test_callback(): 1240 test_callback.calls += 1 1241 test_callback.calls = 0 1242 delay_task = monitor_db.DelayedCallTask( 1243 delay_seconds=2, callback=test_callback, 1244 now_func=test_time) # time 33 1245 self.assertEqual(35, delay_task.end_time) 1246 agent = monitor_db.Agent([delay_task], num_processes=0) 1247 self.assert_(not agent.active_task) 1248 agent.tick() # activates the task and polls it once, time 34.01 1249 self.assertEqual(0, test_callback.calls, "callback called early") 1250 agent.tick() # time 34.99 1251 self.assertEqual(0, test_callback.calls, "callback called early") 1252 agent.tick() # time 35.01 1253 self.assertEqual(1, test_callback.calls) 1254 self.assert_(agent.is_done()) 1255 self.assert_(delay_task.is_done()) 1256 self.assert_(delay_task.success) 1257 self.assert_(not delay_task.aborted) 1258 self.god.check_playback() 1259 1260 1261 def test_delayed_call_abort(self): 1262 delay_task = monitor_db.DelayedCallTask( 1263 delay_seconds=987654, callback=lambda : None) 1264 agent = monitor_db.Agent([delay_task], num_processes=0) 1265 agent.abort() 1266 agent.tick() 1267 self.assert_(agent.is_done()) 1268 self.assert_(delay_task.aborted) 1269 self.assert_(delay_task.is_done()) 1270 self.assert_(not delay_task.success) 1271 self.god.check_playback() 1272 1273 1274 1275class AgentTasksTest(BaseSchedulerTest): 1276 ABSPATH_BASE = '/abspath/' 1277 HOSTNAME = 'myhost' 1278 BASE_TASK_DIR = 'hosts/%s/' % HOSTNAME 1279 RESULTS_DIR = '/results/dir' 1280 DUMMY_PROCESS = object() 1281 HOST_PROTECTION = host_protections.default 1282 PIDFILE_ID = object() 1283 JOB_OWNER = 'test_owner' 1284 JOB_NAME = 'test_job_name' 1285 JOB_AUTOSERV_PARAMS = set(['-u', JOB_OWNER, '-l', JOB_NAME]) 1286 1287 def setUp(self): 1288 super(AgentTasksTest, self).setUp() 1289 self.god = mock.mock_god() 1290 self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path', 1291 mock.mock_function('get_temporary_path', 1292 default_return_val='tempdir')) 1293 self.god.stub_function(drone_manager.DroneManager, 1294 'copy_results_on_drone') 1295 self.god.stub_function(drone_manager.DroneManager, 1296 'copy_to_results_repository') 1297 self.god.stub_function(drone_manager.DroneManager, 1298 'get_pidfile_id_from') 1299 1300 def dummy_absolute_path(drone_manager_self, path): 1301 return self.ABSPATH_BASE + path 1302 self.god.stub_with(drone_manager.DroneManager, 'absolute_path', 1303 dummy_absolute_path) 1304 1305 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run') 1306 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code') 1307 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'kill') 1308 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process') 1309 def mock_has_process(unused): 1310 return True 1311 self.god.stub_with(monitor_db.PidfileRunMonitor, 'has_process', 1312 mock_has_process) 1313 1314 self.host = self.god.create_mock_class(monitor_db.Host, 'host') 1315 self.host.id = 1 1316 self.host.hostname = self.HOSTNAME 1317 self.host.protection = self.HOST_PROTECTION 1318 1319 # For this (and other model creations), we must create the entry this 1320 # way; otherwise, if an entry matching the id already exists, Django 1.0 1321 # will raise an exception complaining about a duplicate primary key. 1322 # This way, Django performs an UPDATE query if an entry matching the id 1323 # already exists. 1324 host = models.Host(id=self.host.id, hostname=self.host.hostname, 1325 protection=self.host.protection) 1326 host.save() 1327 1328 self.job = self.god.create_mock_class(monitor_db.Job, 'job') 1329 self.job.owner = self.JOB_OWNER 1330 self.job.name = self.JOB_NAME 1331 self.job.id = 1337 1332 self.job.tag = lambda: 'fake-job-tag' 1333 job = models.Job(id=self.job.id, owner=self.job.owner, 1334 name=self.job.name, created_on=datetime.datetime.now()) 1335 job.save() 1336 1337 self.queue_entry = self.god.create_mock_class( 1338 monitor_db.HostQueueEntry, 'queue_entry') 1339 self.queue_entry.id = 1 1340 self.queue_entry.job = self.job 1341 self.queue_entry.host = self.host 1342 self.queue_entry.meta_host = None 1343 queue_entry = models.HostQueueEntry(id=self.queue_entry.id, job=job, 1344 host=host, meta_host=None) 1345 queue_entry.save() 1346 1347 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 1348 'dispatcher') 1349 1350 1351 def tearDown(self): 1352 super(AgentTasksTest, self).tearDown() 1353 self.god.unstub_all() 1354 1355 1356 def run_task(self, task, success): 1357 """ 1358 Do essentially what an Agent would do, but protect againt 1359 infinite looping from test errors. 1360 """ 1361 if not getattr(task, 'agent', None): 1362 task.agent = object() 1363 count = 0 1364 while not task.is_done(): 1365 count += 1 1366 if count > 10: 1367 print 'Task failed to finish' 1368 # in case the playback has clues to why it 1369 # failed 1370 self.god.check_playback() 1371 self.fail() 1372 task.poll() 1373 self.assertEquals(task.success, success) 1374 1375 1376 def setup_run_monitor(self, exit_status, task_tag, copy_log_file=True, 1377 aborted=False): 1378 monitor_db.PidfileRunMonitor.run.expect_call( 1379 mock.is_instance_comparator(list), 1380 self.BASE_TASK_DIR + task_tag, 1381 nice_level=monitor_db.AUTOSERV_NICE_LEVEL, 1382 log_file=mock.anything_comparator(), 1383 pidfile_name=monitor_db._AUTOSERV_PID_FILE, 1384 paired_with_pidfile=None) 1385 monitor_db.PidfileRunMonitor.exit_code.expect_call() 1386 if aborted: 1387 monitor_db.PidfileRunMonitor.kill.expect_call() 1388 else: 1389 monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return( 1390 exit_status) 1391 1392 if copy_log_file: 1393 self._setup_move_logfile() 1394 1395 1396 def _setup_move_logfile(self, copy_on_drone=False, 1397 include_destination=False): 1398 monitor_db.PidfileRunMonitor.get_process.expect_call().and_return( 1399 self.DUMMY_PROCESS) 1400 if copy_on_drone: 1401 self.queue_entry.execution_path.expect_call().and_return('tag') 1402 drone_manager.DroneManager.copy_results_on_drone.expect_call( 1403 self.DUMMY_PROCESS, source_path=mock.is_string_comparator(), 1404 destination_path=mock.is_string_comparator()) 1405 elif include_destination: 1406 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1407 self.DUMMY_PROCESS, mock.is_string_comparator(), 1408 destination_path=mock.is_string_comparator()) 1409 else: 1410 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1411 self.DUMMY_PROCESS, mock.is_string_comparator()) 1412 1413 1414 def _test_repair_task_helper(self, success, task_tag, queue_entry=None): 1415 self.host.set_status.expect_call('Repairing') 1416 if success: 1417 self.setup_run_monitor(0, task_tag) 1418 self.host.set_status.expect_call('Ready') 1419 else: 1420 self.setup_run_monitor(1, task_tag) 1421 self.host.set_status.expect_call('Repair Failed') 1422 1423 task = monitor_db.RepairTask(self.host, queue_entry=queue_entry) 1424 self.assertEquals(task.failure_tasks, []) 1425 self.run_task(task, success) 1426 1427 expected_protection = host_protections.Protection.get_string( 1428 host_protections.default) 1429 expected_protection = host_protections.Protection.get_attr_name( 1430 expected_protection) 1431 1432 self.assertTrue(set(task.cmd) >= 1433 set([monitor_db._autoserv_path, '-p', '-R', '-m', 1434 self.HOSTNAME, '-r', 1435 drone_manager.WORKING_DIRECTORY, 1436 '--host-protection', expected_protection])) 1437 self.god.check_playback() 1438 1439 1440 def test_repair_task(self): 1441 self._test_repair_task_helper(True, '1-repair') 1442 self._test_repair_task_helper(False, '2-repair') 1443 1444 1445 def test_repair_task_with_hqe_already_requeued(self): 1446 # during recovery, a RepairTask can be passed a queue entry that has 1447 # already been requeued. ensure it leaves the HQE alone in that case. 1448 self.queue_entry.meta_host = 1 1449 self.queue_entry.host = None 1450 self._test_repair_task_helper(False, '1-repair', 1451 queue_entry=self.queue_entry) 1452 1453 1454 def test_recovery_repair_task_working_directory(self): 1455 # ensure that a RepairTask recovering an existing SpecialTask picks up 1456 # the working directory immediately 1457 class MockSpecialTask(object): 1458 def execution_path(self): 1459 return '/my/path' 1460 1461 special_task = MockSpecialTask() 1462 task = monitor_db.RepairTask(self.host, task=special_task) 1463 1464 self.assertEquals(task._working_directory, '/my/path') 1465 1466 1467 def test_repair_task_aborted(self): 1468 self.host.set_status.expect_call('Repairing') 1469 self.setup_run_monitor(0, '1-repair', aborted=True) 1470 1471 task = monitor_db.RepairTask(self.host) 1472 task.agent = object() 1473 task.poll() 1474 task.abort() 1475 1476 self.assertTrue(task.done) 1477 self.assertTrue(task.aborted) 1478 self.assertTrue(task.task.is_complete) 1479 self.assertFalse(task.task.is_active) 1480 self.god.check_playback() 1481 1482 1483 def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair, 1484 task_tag): 1485 self.god.stub_class(monitor_db, 'FinalReparseTask') 1486 self.god.stub_class(monitor_db, 'Agent') 1487 self.god.stub_class_method(monitor_db.TaskWithJobKeyvals, 1488 '_write_keyval_after_job') 1489 agent = DummyAgent() 1490 agent.dispatcher = self._dispatcher 1491 1492 self.host.set_status.expect_call('Repairing') 1493 self.queue_entry.requeue.expect_call() 1494 self.setup_run_monitor(1, task_tag) 1495 self.host.set_status.expect_call('Repair Failed') 1496 self.queue_entry.update_from_database.expect_call() 1497 self.queue_entry.set_execution_subdir.expect_call() 1498 monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call( 1499 'job_queued', mock.is_instance_comparator(int)) 1500 monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call( 1501 'job_finished', mock.is_instance_comparator(int)) 1502 self._setup_move_logfile(copy_on_drone=True) 1503 self.queue_entry.execution_path.expect_call().and_return('tag') 1504 self._setup_move_logfile() 1505 self.job.parse_failed_repair = parse_failed_repair 1506 if parse_failed_repair: 1507 reparse_task = monitor_db.FinalReparseTask.expect_new( 1508 [self.queue_entry]) 1509 reparse_agent = monitor_db.Agent.expect_new([reparse_task], 1510 num_processes=0) 1511 self._dispatcher.add_agent.expect_call(reparse_agent) 1512 self.queue_entry.handle_host_failure.expect_call() 1513 1514 task = monitor_db.RepairTask(self.host, self.queue_entry) 1515 task.agent = agent 1516 self.queue_entry.status = 'Queued' 1517 self.job.created_on = datetime.datetime(2009, 1, 1) 1518 self.run_task(task, False) 1519 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1520 self.god.check_playback() 1521 1522 1523 def test_repair_task_with_queue_entry(self): 1524 self._test_repair_task_with_queue_entry_helper(True, '1-repair') 1525 self._test_repair_task_with_queue_entry_helper(False, '2-repair') 1526 1527 1528 def setup_verify_expects(self, success, use_queue_entry, task_tag): 1529 if use_queue_entry: 1530 self.queue_entry.set_status.expect_call('Verifying') 1531 self.host.set_status.expect_call('Verifying') 1532 if success: 1533 self.setup_run_monitor(0, task_tag) 1534 self.host.set_status.expect_call('Ready') 1535 else: 1536 self.setup_run_monitor(1, task_tag) 1537 if use_queue_entry and not self.queue_entry.meta_host: 1538 self.queue_entry.set_execution_subdir.expect_call() 1539 self.queue_entry.execution_path.expect_call().and_return('tag') 1540 self._setup_move_logfile(include_destination=True) 1541 1542 1543 def _check_verify_failure_tasks(self, verify_task): 1544 self.assertEquals(len(verify_task.failure_tasks), 1) 1545 repair_task = verify_task.failure_tasks[0] 1546 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 1547 self.assertEquals(verify_task.host, repair_task.host) 1548 if verify_task.queue_entry: 1549 self.assertEquals(repair_task.queue_entry, 1550 verify_task.queue_entry) 1551 else: 1552 self.assertEquals(repair_task.queue_entry, None) 1553 1554 1555 def _test_verify_task_helper(self, success, task_tag, use_queue_entry=False, 1556 use_meta_host=False): 1557 self.setup_verify_expects(success, use_queue_entry, task_tag) 1558 1559 if use_queue_entry: 1560 task = monitor_db.VerifyTask(queue_entry=self.queue_entry) 1561 else: 1562 task = monitor_db.VerifyTask(host=self.host) 1563 self._check_verify_failure_tasks(task) 1564 self.run_task(task, success) 1565 self.assertTrue(set(task.cmd) >= 1566 set([monitor_db._autoserv_path, '-p', '-v', '-m', 1567 self.HOSTNAME, '-r', 1568 drone_manager.WORKING_DIRECTORY])) 1569 if use_queue_entry: 1570 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1571 self.god.check_playback() 1572 1573 1574 def test_verify_task_with_host(self): 1575 self._test_verify_task_helper(True, '1-verify') 1576 self._test_verify_task_helper(False, '2-verify') 1577 1578 1579 def test_verify_task_with_queue_entry(self): 1580 self._test_verify_task_helper(True, '1-verify', use_queue_entry=True) 1581 self._test_verify_task_helper(False, '2-verify', use_queue_entry=True) 1582 1583 1584 def test_verify_task_with_metahost(self): 1585 self.queue_entry.meta_host = 1 1586 self.test_verify_task_with_queue_entry() 1587 1588 1589 def test_specialtask_abort_before_prolog(self): 1590 task = monitor_db.RepairTask(self.host) 1591 task.abort() 1592 self.assertTrue(task.aborted) 1593 1594 1595 def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None, 1596 hqe_aborted=False): 1597 self.queue_entry.execution_path.expect_call().and_return('tag') 1598 self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new() 1599 self.pidfile_monitor.pidfile_id = self.PIDFILE_ID 1600 self.pidfile_monitor.attach_to_existing_process.expect_call('tag') 1601 if autoserv_success: 1602 code = 0 1603 else: 1604 code = 1 1605 self.queue_entry.update_from_database.expect_call() 1606 self.queue_entry.aborted = hqe_aborted 1607 if not hqe_aborted: 1608 self.pidfile_monitor.exit_code.expect_call().and_return(code) 1609 1610 if hqe_status: 1611 self.queue_entry.set_status.expect_call(hqe_status) 1612 1613 1614 def _setup_pre_parse_expects(self, autoserv_success): 1615 self._setup_post_job_task_expects(autoserv_success, 'Parsing') 1616 1617 1618 def _setup_post_parse_expects(self, autoserv_success): 1619 if autoserv_success: 1620 status = 'Completed' 1621 else: 1622 status = 'Failed' 1623 self.queue_entry.set_status.expect_call(status) 1624 1625 1626 def _expect_execute_run_monitor(self): 1627 self.monitor.exit_code.expect_call() 1628 self.monitor.exit_code.expect_call().and_return(0) 1629 self._expect_copy_results() 1630 1631 1632 def _setup_post_job_run_monitor(self, pidfile_name): 1633 self.pidfile_monitor.has_process.expect_call().and_return(True) 1634 autoserv_pidfile_id = object() 1635 self.monitor = monitor_db.PidfileRunMonitor.expect_new() 1636 self.monitor.run.expect_call( 1637 mock.is_instance_comparator(list), 1638 'tag', 1639 nice_level=monitor_db.AUTOSERV_NICE_LEVEL, 1640 log_file=mock.anything_comparator(), 1641 pidfile_name=pidfile_name, 1642 paired_with_pidfile=self.PIDFILE_ID) 1643 self._expect_execute_run_monitor() 1644 1645 1646 def _expect_copy_results(self, monitor=None, queue_entry=None): 1647 if monitor is None: 1648 monitor = self.monitor 1649 monitor.has_process.expect_call().and_return(True) 1650 if queue_entry: 1651 queue_entry.execution_path.expect_call().and_return('tag') 1652 monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS) 1653 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1654 self.DUMMY_PROCESS, mock.is_string_comparator()) 1655 1656 1657 def _test_final_reparse_task_helper(self, autoserv_success=True): 1658 self._setup_pre_parse_expects(autoserv_success) 1659 self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE) 1660 self._setup_post_parse_expects(autoserv_success) 1661 1662 task = monitor_db.FinalReparseTask([self.queue_entry]) 1663 self.run_task(task, True) 1664 1665 self.god.check_playback() 1666 cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r', 1667 '-o', '-P', '/abspath/tag'] 1668 self.assertEquals(task.cmd, cmd) 1669 1670 1671 def test_final_reparse_task(self): 1672 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1673 self._test_final_reparse_task_helper() 1674 self._test_final_reparse_task_helper(autoserv_success=False) 1675 1676 1677 def test_final_reparse_throttling(self): 1678 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1679 self.god.stub_function(monitor_db.FinalReparseTask, 1680 '_can_run_new_parse') 1681 1682 self._setup_pre_parse_expects(True) 1683 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1684 False) 1685 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1686 True) 1687 self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE) 1688 self._setup_post_parse_expects(True) 1689 1690 task = monitor_db.FinalReparseTask([self.queue_entry]) 1691 self.run_task(task, True) 1692 self.god.check_playback() 1693 1694 1695 def test_final_reparse_recovery(self): 1696 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1697 self.monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor, 1698 'run_monitor') 1699 self._setup_post_job_task_expects(True) 1700 self._expect_execute_run_monitor() 1701 self._setup_post_parse_expects(True) 1702 1703 task = monitor_db.FinalReparseTask([self.queue_entry], 1704 recover_run_monitor=self.monitor) 1705 self.run_task(task, True) 1706 self.god.check_playback() 1707 1708 1709 def _setup_gather_logs_expects(self, autoserv_killed=True, 1710 hqe_aborted=False): 1711 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1712 self.god.stub_class(monitor_db, 'FinalReparseTask') 1713 self._setup_post_job_task_expects(not autoserv_killed, 'Gathering', 1714 hqe_aborted) 1715 if hqe_aborted: 1716 exit_code = None 1717 elif autoserv_killed: 1718 exit_code = 271 1719 else: 1720 exit_code = 0 1721 self.pidfile_monitor.exit_code.expect_call().and_return(exit_code) 1722 if exit_code != 0: 1723 self._setup_post_job_run_monitor('.collect_crashinfo_execute') 1724 self.pidfile_monitor.has_process.expect_call().and_return(True) 1725 self._expect_copy_results(monitor=self.pidfile_monitor, 1726 queue_entry=self.queue_entry) 1727 parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry]) 1728 _set_host_and_qe_ids(parse_task) 1729 self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task)) 1730 1731 self.pidfile_monitor.num_tests_failed.expect_call().and_return(0) 1732 1733 1734 def _run_gather_logs_task(self): 1735 task = monitor_db.GatherLogsTask(self.job, [self.queue_entry]) 1736 task.agent = DummyAgent() 1737 task.agent.dispatcher = self._dispatcher 1738 self.run_task(task, True) 1739 self.god.check_playback() 1740 1741 1742 def test_gather_logs_task(self): 1743 self._setup_gather_logs_expects() 1744 # no rebooting for this basic test 1745 self.job.reboot_after = models.RebootAfter.NEVER 1746 self.host.set_status.expect_call('Ready') 1747 1748 self._run_gather_logs_task() 1749 1750 1751 def test_gather_logs_task_successful_autoserv(self): 1752 # When Autoserv exits successfully, no collect_crashinfo stage runs 1753 self._setup_gather_logs_expects(autoserv_killed=False) 1754 self.job.reboot_after = models.RebootAfter.NEVER 1755 self.host.set_status.expect_call('Ready') 1756 1757 self._run_gather_logs_task() 1758 1759 1760 def _setup_gather_task_cleanup_expects(self): 1761 self.god.stub_class(monitor_db, 'CleanupTask') 1762 cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host) 1763 _set_host_and_qe_ids(cleanup_task) 1764 self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task)) 1765 1766 1767 def test_gather_logs_reboot_hosts(self): 1768 self._setup_gather_logs_expects() 1769 self.job.reboot_after = models.RebootAfter.ALWAYS 1770 self._setup_gather_task_cleanup_expects() 1771 1772 self._run_gather_logs_task() 1773 1774 1775 def test_gather_logs_reboot_on_abort(self): 1776 self._setup_gather_logs_expects(hqe_aborted=True) 1777 self.job.reboot_after = models.RebootAfter.NEVER 1778 self._setup_gather_task_cleanup_expects() 1779 1780 self._run_gather_logs_task() 1781 1782 1783 def _test_cleanup_task_helper(self, success, task_tag, 1784 use_queue_entry=False): 1785 if use_queue_entry: 1786 self.queue_entry.get_host.expect_call().and_return(self.host) 1787 self.host.set_status.expect_call('Cleaning') 1788 if success: 1789 self.setup_run_monitor(0, task_tag) 1790 self.host.set_status.expect_call('Ready') 1791 self.host.update_field.expect_call('dirty', 0) 1792 else: 1793 self.setup_run_monitor(1, task_tag) 1794 if use_queue_entry and not self.queue_entry.meta_host: 1795 self.queue_entry.set_execution_subdir.expect_call() 1796 self.queue_entry.execution_path.expect_call().and_return('tag') 1797 self._setup_move_logfile(include_destination=True) 1798 1799 if use_queue_entry: 1800 task = monitor_db.CleanupTask(queue_entry=self.queue_entry) 1801 else: 1802 task = monitor_db.CleanupTask(host=self.host) 1803 self.assertEquals(len(task.failure_tasks), 1) 1804 repair_task = task.failure_tasks[0] 1805 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 1806 if use_queue_entry: 1807 self.assertEquals(repair_task.queue_entry, self.queue_entry) 1808 1809 self.run_task(task, success) 1810 1811 self.god.check_playback() 1812 self.assert_(set(task.cmd) >= 1813 set([monitor_db._autoserv_path, '-p', '--cleanup', '-m', 1814 self.HOSTNAME, '-r', 1815 drone_manager.WORKING_DIRECTORY])) 1816 if use_queue_entry: 1817 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1818 1819 def test_cleanup_task(self): 1820 self._test_cleanup_task_helper(True, '1-cleanup') 1821 self._test_cleanup_task_helper(False, '2-cleanup') 1822 1823 1824 def test_cleanup_task_with_queue_entry(self): 1825 self._test_cleanup_task_helper(False, '1-cleanup', True) 1826 1827 1828 def test_recovery_queue_task_aborted_early(self): 1829 # abort a recovery QueueTask right after it's created 1830 self.god.stub_class_method(monitor_db.QueueTask, '_log_abort') 1831 self.god.stub_class_method(monitor_db.QueueTask, '_finish_task') 1832 run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor, 1833 'run_monitor') 1834 1835 self.queue_entry.execution_path.expect_call().and_return('tag') 1836 run_monitor.kill.expect_call() 1837 run_monitor.has_process.expect_call().and_return(True) 1838 monitor_db.QueueTask._log_abort.expect_call() 1839 monitor_db.QueueTask._finish_task.expect_call() 1840 1841 task = monitor_db.QueueTask(self.job, [self.queue_entry], 1842 recover_run_monitor=run_monitor) 1843 task.abort() 1844 self.assert_(task.aborted) 1845 self.god.check_playback() 1846 1847 1848class HostTest(BaseSchedulerTest): 1849 def test_cmp_for_sort(self): 1850 expected_order = [ 1851 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010', 1852 'host10', 'host11', 'yolkfolk'] 1853 hostname_idx = list(monitor_db.Host._fields).index('hostname') 1854 row = [None] * len(monitor_db.Host._fields) 1855 hosts = [] 1856 for hostname in expected_order: 1857 row[hostname_idx] = hostname 1858 hosts.append(monitor_db.Host(row=row, new_record=True)) 1859 1860 host1 = hosts[expected_order.index('Host1')] 1861 host010 = hosts[expected_order.index('HOST010')] 1862 host10 = hosts[expected_order.index('host10')] 1863 host3 = hosts[expected_order.index('host3')] 1864 alice = hosts[expected_order.index('alice')] 1865 self.assertEqual(0, monitor_db.Host.cmp_for_sort(host10, host10)) 1866 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host10, host010)) 1867 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host010, host10)) 1868 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host10)) 1869 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host010)) 1870 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host10)) 1871 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host010)) 1872 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, host1)) 1873 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host3)) 1874 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(alice, host3)) 1875 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, alice)) 1876 self.assertEqual(0, monitor_db.Host.cmp_for_sort(alice, alice)) 1877 1878 hosts.sort(cmp=monitor_db.Host.cmp_for_sort) 1879 self.assertEqual(expected_order, [h.hostname for h in hosts]) 1880 1881 hosts.reverse() 1882 hosts.sort(cmp=monitor_db.Host.cmp_for_sort) 1883 self.assertEqual(expected_order, [h.hostname for h in hosts]) 1884 1885 1886class HostQueueEntryTest(BaseSchedulerTest): 1887 def _create_hqe(self, dependency_labels=(), **create_job_kwargs): 1888 job = self._create_job(**create_job_kwargs) 1889 for label in dependency_labels: 1890 job.dependency_labels.add(label) 1891 hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id)) 1892 self.assertEqual(1, len(hqes)) 1893 return hqes[0] 1894 1895 1896 def _check_hqe_labels(self, hqe, expected_labels): 1897 expected_labels = set(expected_labels) 1898 label_names = set(label.name for label in hqe.get_labels()) 1899 self.assertEqual(expected_labels, label_names) 1900 1901 1902 def test_get_labels_empty(self): 1903 hqe = self._create_hqe(hosts=[1]) 1904 labels = list(hqe.get_labels()) 1905 self.assertEqual([], labels) 1906 1907 1908 def test_get_labels_metahost(self): 1909 hqe = self._create_hqe(metahosts=[2]) 1910 self._check_hqe_labels(hqe, ['label2']) 1911 1912 1913 def test_get_labels_dependancies(self): 1914 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4), 1915 metahosts=[1]) 1916 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4']) 1917 1918 1919class JobTest(BaseSchedulerTest): 1920 def setUp(self): 1921 super(JobTest, self).setUp() 1922 self.god.stub_with( 1923 drone_manager.DroneManager, 'attach_file_to_execution', 1924 mock.mock_function('attach_file_to_execution', 1925 default_return_val='/test/path/tmp/foo')) 1926 1927 1928 def _test_pre_job_tasks_helper(self): 1929 """ 1930 Calls HQE._do_pre_run_job_tasks() and returns the task list after 1931 confirming that the last task is the SetEntryPendingTask. 1932 """ 1933 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() 1934 pre_job_agent = queue_entry._do_run_pre_job_tasks() 1935 self.assert_(isinstance(pre_job_agent, monitor_db.Agent)) 1936 pre_job_tasks = list(pre_job_agent.queue.queue) 1937 self.assertTrue(isinstance(pre_job_tasks[-1], 1938 monitor_db.SetEntryPendingTask)) 1939 1940 return pre_job_tasks 1941 1942 1943 def _test_run_helper(self, expect_agent=True, expect_starting=False, 1944 expect_pending=False): 1945 if expect_starting: 1946 expected_status = models.HostQueueEntry.Status.STARTING 1947 elif expect_pending: 1948 expected_status = models.HostQueueEntry.Status.PENDING 1949 else: 1950 expected_status = models.HostQueueEntry.Status.VERIFYING 1951 job = monitor_db.Job.fetch('id = 1').next() 1952 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() 1953 assert queue_entry.job is job 1954 agent = job.run_if_ready(queue_entry) 1955 1956 self.god.check_playback() 1957 actual_status = models.HostQueueEntry.smart_get(1).status 1958 self.assertEquals(expected_status, actual_status) 1959 1960 if not expect_agent: 1961 self.assertEquals(agent, None) 1962 return 1963 1964 self.assert_(isinstance(agent, monitor_db.Agent)) 1965 tasks = list(agent.queue.queue) 1966 return tasks 1967 1968 1969 def _check_verify_task(self, verify_task): 1970 self.assert_(isinstance(verify_task, monitor_db.VerifyTask)) 1971 self.assertEquals(verify_task.queue_entry.id, 1) 1972 1973 1974 def _check_pending_task(self, pending_task): 1975 self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask)) 1976 self.assertEquals(pending_task._queue_entry.id, 1) 1977 1978 1979 def test_run_if_ready_delays(self): 1980 # Also tests Job.run_with_ready_delay() on atomic group jobs. 1981 django_job = self._create_job(hosts=[5, 6], atomic_group=1) 1982 job = monitor_db.Job(django_job.id) 1983 self.assertEqual(1, job.synch_count) 1984 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) 1985 self.assertEqual(2, len(django_hqes)) 1986 self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines) 1987 1988 def set_hqe_status(django_hqe, status): 1989 django_hqe.status = status 1990 django_hqe.save() 1991 monitor_db.HostQueueEntry(django_hqe.id).host.set_status(status) 1992 1993 # An initial state, our synch_count is 1 1994 set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING) 1995 set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING) 1996 1997 # So that we don't depend on the config file value during the test. 1998 self.assert_(scheduler_config.config 1999 .secs_to_wait_for_atomic_group_hosts is not None) 2000 self.god.stub_with(scheduler_config.config, 2001 'secs_to_wait_for_atomic_group_hosts', 123456) 2002 2003 # Get the pending one as a monitor_db.HostQueueEntry object. 2004 pending_hqe = monitor_db.HostQueueEntry(django_hqes[1].id) 2005 self.assert_(not job._delay_ready_task) 2006 self.assertTrue(job.is_ready()) 2007 2008 # Ready with one pending, one verifying and an atomic group should 2009 # result in a DelayCallTask to re-check if we're ready a while later. 2010 agent = job.run_if_ready(pending_hqe) 2011 self.assert_(job._delay_ready_task) 2012 self.assert_(isinstance(agent, monitor_db.Agent)) 2013 tasks = list(agent.queue.queue) 2014 self.assertEqual(1, len(tasks)) 2015 self.assert_(isinstance(tasks[0], monitor_db.DelayedCallTask)) 2016 delay_task = tasks[0] 2017 self.assert_(not delay_task.is_done()) 2018 2019 self.god.stub_function(job, 'run') 2020 2021 # Test that the DelayedCallTask's callback queued up above does the 2022 # correct thing and returns the Agent returned by job.run(). 2023 job.run.expect_call(pending_hqe).and_return('Fake Agent') 2024 self.assertEqual('Fake Agent', delay_task._callback()) 2025 2026 # A delay already exists, this must do nothing. 2027 self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) 2028 2029 # Adjust the delay deadline so that enough time has passed. 2030 job._delay_ready_task.end_time = time.time() - 111111 2031 job.run.expect_call(pending_hqe).and_return('Forty two') 2032 # ...the delay_expired condition should cause us to call run() 2033 self.assertEqual('Forty two', job.run_with_ready_delay(pending_hqe)) 2034 2035 # Adjust the delay deadline back so that enough time has not passed. 2036 job._delay_ready_task.end_time = time.time() + 111111 2037 self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) 2038 2039 set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.PENDING) 2040 # Now max_number_of_machines HQEs are in pending state. Remaining 2041 # delay will now be ignored. 2042 job.run.expect_call(pending_hqe).and_return('Watermelon') 2043 # ...the over_max_threshold test should cause us to call run() 2044 self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe)) 2045 2046 other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id) 2047 self.assertTrue(pending_hqe.job is other_hqe.job) 2048 # DBObject classes should reuse instances so these should be the same. 2049 self.assertEqual(job, other_hqe.job) 2050 self.assertEqual(other_hqe.job, pending_hqe.job) 2051 # Be sure our delay was not lost during the other_hqe construction. 2052 self.assert_(job._delay_ready_task) 2053 self.assertFalse(job._delay_ready_task.is_done()) 2054 self.assertFalse(job._delay_ready_task.aborted) 2055 2056 # We want the real run() to be called below. 2057 self.god.unstub(job, 'run') 2058 2059 # We pass in the other HQE this time the same way it would happen 2060 # for real when one host finishes verifying and enters pending. 2061 agent = job.run_if_ready(other_hqe) 2062 2063 # The delayed task must be aborted by the actual run() call above. 2064 self.assertTrue(job._delay_ready_task.aborted) 2065 self.assertFalse(job._delay_ready_task.success) 2066 self.assertTrue(job._delay_ready_task.is_done()) 2067 2068 # Check that job run() and _finish_run() were called by the above: 2069 tasks = list(agent.queue.queue) 2070 self.assertEqual(1, len(tasks)) 2071 self.assert_(isinstance(tasks[0], monitor_db.QueueTask)) 2072 # Requery these hqes in order to verify the status from the DB. 2073 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) 2074 for entry in django_hqes: 2075 self.assertEqual(models.HostQueueEntry.Status.STARTING, 2076 entry.status) 2077 2078 # We're already running, but more calls to run_with_ready_delay can 2079 # continue to come in due to straggler hosts enter Pending. Make 2080 # sure we don't do anything. 2081 self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) 2082 2083 2084 def test__atomic_and_has_started__on_atomic(self): 2085 self._create_job(hosts=[5, 6], atomic_group=1) 2086 job = monitor_db.Job.fetch('id = 1').next() 2087 self.assertFalse(job._atomic_and_has_started()) 2088 2089 self._update_hqe("status='Pending'") 2090 self.assertFalse(job._atomic_and_has_started()) 2091 self._update_hqe("status='Verifying'") 2092 self.assertFalse(job._atomic_and_has_started()) 2093 self.assertFalse(job._atomic_and_has_started()) 2094 self._update_hqe("status='Failed'") 2095 self.assertFalse(job._atomic_and_has_started()) 2096 self._update_hqe("status='Stopped'") 2097 self.assertFalse(job._atomic_and_has_started()) 2098 2099 self._update_hqe("status='Starting'") 2100 self.assertTrue(job._atomic_and_has_started()) 2101 self._update_hqe("status='Completed'") 2102 self.assertTrue(job._atomic_and_has_started()) 2103 self._update_hqe("status='Aborted'") 2104 2105 2106 def test__atomic_and_has_started__not_atomic(self): 2107 self._create_job(hosts=[1, 2]) 2108 job = monitor_db.Job.fetch('id = 1').next() 2109 self.assertFalse(job._atomic_and_has_started()) 2110 self._update_hqe("status='Starting'") 2111 self.assertFalse(job._atomic_and_has_started()) 2112 2113 2114 def test_run_asynchronous(self): 2115 self._create_job(hosts=[1, 2]) 2116 2117 tasks = self._test_pre_job_tasks_helper() 2118 2119 self.assertEquals(len(tasks), 2) 2120 verify_task, pending_task = tasks 2121 self._check_verify_task(verify_task) 2122 self._check_pending_task(pending_task) 2123 2124 2125 def test_run_asynchronous_skip_verify(self): 2126 job = self._create_job(hosts=[1, 2]) 2127 job.run_verify = False 2128 job.save() 2129 2130 tasks = self._test_pre_job_tasks_helper() 2131 2132 self.assertEquals(len(tasks), 1) 2133 pending_task = tasks[0] 2134 self._check_pending_task(pending_task) 2135 2136 2137 def test_run_synchronous_verify(self): 2138 self._create_job(hosts=[1, 2], synchronous=True) 2139 2140 tasks = self._test_pre_job_tasks_helper() 2141 self.assertEquals(len(tasks), 2) 2142 verify_task, pending_task = tasks 2143 self._check_verify_task(verify_task) 2144 self._check_pending_task(pending_task) 2145 2146 2147 def test_run_synchronous_skip_verify(self): 2148 job = self._create_job(hosts=[1, 2], synchronous=True) 2149 job.run_verify = False 2150 job.save() 2151 2152 tasks = self._test_pre_job_tasks_helper() 2153 self.assertEquals(len(tasks), 1) 2154 self._check_pending_task(tasks[0]) 2155 2156 2157 def test_run_synchronous_ready(self): 2158 self._create_job(hosts=[1, 2], synchronous=True) 2159 self._update_hqe("status='Pending', execution_subdir=''") 2160 2161 tasks = self._test_run_helper(expect_starting=True) 2162 self.assertEquals(len(tasks), 1) 2163 queue_task = tasks[0] 2164 2165 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 2166 self.assertEquals(queue_task.job.id, 1) 2167 hqe_ids = [hqe.id for hqe in queue_task.queue_entries] 2168 self.assertEquals(hqe_ids, [1, 2]) 2169 2170 2171 def test_run_atomic_group_already_started(self): 2172 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 2173 self._update_hqe("status='Starting', execution_subdir=''") 2174 2175 job = monitor_db.Job.fetch('id = 1').next() 2176 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() 2177 assert queue_entry.job is job 2178 self.assertEqual(None, job.run(queue_entry)) 2179 2180 self.god.check_playback() 2181 2182 2183 def test_run_synchronous_atomic_group_ready(self): 2184 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 2185 self._update_hqe("status='Pending', execution_subdir=''") 2186 2187 tasks = self._test_run_helper(expect_starting=True) 2188 self.assertEquals(len(tasks), 1) 2189 queue_task = tasks[0] 2190 2191 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 2192 # Atomic group jobs that do not depend on a specific label in the 2193 # atomic group will use the atomic group name as their group name. 2194 self.assertEquals(queue_task.group_name, 'atomic1') 2195 2196 2197 def test_run_synchronous_atomic_group_with_label_ready(self): 2198 job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 2199 job.dependency_labels.add(self.label4) 2200 self._update_hqe("status='Pending', execution_subdir=''") 2201 2202 tasks = self._test_run_helper(expect_starting=True) 2203 self.assertEquals(len(tasks), 1) 2204 queue_task = tasks[0] 2205 2206 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 2207 # Atomic group jobs that also specify a label in the atomic group 2208 # will use the label name as their group name. 2209 self.assertEquals(queue_task.group_name, 'label4') 2210 2211 2212 def test_reboot_before_always(self): 2213 job = self._create_job(hosts=[1]) 2214 job.reboot_before = models.RebootBefore.ALWAYS 2215 job.save() 2216 2217 tasks = self._test_pre_job_tasks_helper() 2218 self.assertEquals(len(tasks), 3) 2219 cleanup_task = tasks[0] 2220 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 2221 self.assertEquals(cleanup_task.host.id, 1) 2222 2223 2224 def _test_reboot_before_if_dirty_helper(self, expect_reboot): 2225 job = self._create_job(hosts=[1]) 2226 job.reboot_before = models.RebootBefore.IF_DIRTY 2227 job.save() 2228 2229 tasks = self._test_pre_job_tasks_helper() 2230 self.assertEquals(len(tasks), expect_reboot and 3 or 2) 2231 if expect_reboot: 2232 cleanup_task = tasks[0] 2233 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 2234 self.assertEquals(cleanup_task.host.id, 1) 2235 2236 2237 def test_reboot_before_if_dirty(self): 2238 models.Host.smart_get(1).update_object(dirty=True) 2239 self._test_reboot_before_if_dirty_helper(True) 2240 2241 2242 def test_reboot_before_not_dirty(self): 2243 models.Host.smart_get(1).update_object(dirty=False) 2244 self._test_reboot_before_if_dirty_helper(False) 2245 2246 2247 def test_next_group_name(self): 2248 django_job = self._create_job(metahosts=[1]) 2249 job = monitor_db.Job(id=django_job.id) 2250 self.assertEqual('group0', job._next_group_name()) 2251 2252 for hqe in django_job.hostqueueentry_set.filter(): 2253 hqe.execution_subdir = 'my_rack.group0' 2254 hqe.save() 2255 self.assertEqual('my_rack.group1', job._next_group_name('my/rack')) 2256 2257 2258class TopLevelFunctionsTest(unittest.TestCase): 2259 def setUp(self): 2260 self.god = mock.mock_god() 2261 2262 2263 def tearDown(self): 2264 self.god.unstub_all() 2265 2266 2267 def test_autoserv_command_line(self): 2268 machines = 'abcd12,efgh34' 2269 extra_args = ['-Z', 'hello'] 2270 expected_command_line = [monitor_db._autoserv_path, '-p', 2271 '-m', machines, '-r', 2272 drone_manager.WORKING_DIRECTORY] 2273 2274 command_line = monitor_db._autoserv_command_line(machines, extra_args) 2275 self.assertEqual(expected_command_line + ['--verbose'] + extra_args, 2276 command_line) 2277 2278 class FakeJob(object): 2279 owner = 'Bob' 2280 name = 'fake job name' 2281 id = 1337 2282 2283 class FakeHQE(object): 2284 job = FakeJob 2285 2286 command_line = monitor_db._autoserv_command_line( 2287 machines, extra_args=[], queue_entry=FakeHQE, verbose=False) 2288 self.assertEqual(expected_command_line + 2289 ['-u', FakeJob.owner, '-l', FakeJob.name], 2290 command_line) 2291 2292 2293if __name__ == '__main__': 2294 unittest.main() 2295