monitor_db_unittest.py revision b6d1662e18d756483d5fd81f4057cae4ef62152c
1#!/usr/bin/python 2 3import unittest, time, subprocess, os, StringIO, tempfile, datetime, shutil 4import common 5import MySQLdb 6from autotest_lib.frontend import setup_django_environment 7from autotest_lib.frontend.afe import frontend_test_utils 8from autotest_lib.client.common_lib import global_config, host_protections 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.database import database_connection, migrate 11from autotest_lib.frontend import thread_local 12from autotest_lib.frontend.afe import models 13from autotest_lib.scheduler import monitor_db, drone_manager, email_manager 14from autotest_lib.scheduler import scheduler_config 15 16_DEBUG = False 17 18 19class DummyAgent(object): 20 _is_running = False 21 _is_done = False 22 num_processes = 1 23 host_ids = [] 24 queue_entry_ids = [] 25 26 def is_running(self): 27 return self._is_running 28 29 30 def tick(self): 31 self._is_running = True 32 33 34 def is_done(self): 35 return self._is_done 36 37 38 def set_done(self, done): 39 self._is_done = done 40 self._is_running = not done 41 42 43class IsRow(mock.argument_comparator): 44 def __init__(self, row_id): 45 self.row_id = row_id 46 47 48 def is_satisfied_by(self, parameter): 49 return list(parameter)[0] == self.row_id 50 51 52 def __str__(self): 53 return 'row with id %s' % self.row_id 54 55 56class IsAgentWithTask(mock.argument_comparator): 57 def __init__(self, task): 58 self._task = task 59 60 61 def is_satisfied_by(self, parameter): 62 if not isinstance(parameter, monitor_db.Agent): 63 return False 64 tasks = list(parameter.queue.queue) 65 if len(tasks) != 1: 66 return False 67 return tasks[0] == self._task 68 69 70def _set_host_and_qe_ids(agent_or_task, id_list=None): 71 if id_list is None: 72 id_list = [] 73 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list 74 75 76class BaseSchedulerTest(unittest.TestCase, 77 frontend_test_utils.FrontendTestMixin): 78 _config_section = 'AUTOTEST_WEB' 79 80 def _do_query(self, sql): 81 self._database.execute(sql) 82 83 84 def _set_monitor_stubs(self): 85 # Clear the instance cache as this is a brand new database. 86 monitor_db.DBObject._clear_instance_cache() 87 88 self._database = ( 89 database_connection.DatabaseConnection.get_test_database( 90 self._test_db_file)) 91 self._database.connect() 92 self._database.debug = _DEBUG 93 94 monitor_db._db = self._database 95 monitor_db._drone_manager._results_dir = '/test/path' 96 monitor_db._drone_manager._temporary_directory = '/test/path/tmp' 97 98 99 def setUp(self): 100 self._frontend_common_setup() 101 self._set_monitor_stubs() 102 self._dispatcher = monitor_db.Dispatcher() 103 104 105 def tearDown(self): 106 self._database.disconnect() 107 self._frontend_common_teardown() 108 109 110 def _update_hqe(self, set, where=''): 111 query = 'UPDATE host_queue_entries SET ' + set 112 if where: 113 query += ' WHERE ' + where 114 self._do_query(query) 115 116 117class DBObjectTest(BaseSchedulerTest): 118 # It may seem odd to subclass BaseSchedulerTest for this but it saves us 119 # duplicating some setup work for what we want to test. 120 121 122 def test_compare_fields_in_row(self): 123 host = monitor_db.Host(id=1) 124 fields = list(host._fields) 125 row_data = [getattr(host, fieldname) for fieldname in fields] 126 self.assertEqual({}, host._compare_fields_in_row(row_data)) 127 row_data[fields.index('hostname')] = 'spam' 128 self.assertEqual({'hostname': ('host1', 'spam')}, 129 host._compare_fields_in_row(row_data)) 130 row_data[fields.index('id')] = 23 131 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)}, 132 host._compare_fields_in_row(row_data)) 133 134 135 def test_always_query(self): 136 host_a = monitor_db.Host(id=2) 137 self.assertEqual(host_a.hostname, 'host2') 138 self._do_query('UPDATE hosts SET hostname="host2-updated" WHERE id=2') 139 host_b = monitor_db.Host(id=2, always_query=True) 140 self.assert_(host_a is host_b, 'Cached instance not returned.') 141 self.assertEqual(host_a.hostname, 'host2-updated', 142 'Database was not re-queried') 143 144 # If either of these are called, a query was made when it shouldn't be. 145 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!') 146 host_a._update_fields_from_row = host_a._compare_fields_in_row 147 host_c = monitor_db.Host(id=2, always_query=False) 148 self.assert_(host_a is host_c, 'Cached instance not returned') 149 150 151 def test_delete(self): 152 host = monitor_db.Host(id=3) 153 host.delete() 154 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3, 155 always_query=False) 156 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3, 157 always_query=True) 158 159 def test_save(self): 160 # Dummy Job to avoid creating a one in the HostQueueEntry __init__. 161 class MockJob(object): 162 def __init__(self, id): 163 pass 164 def tag(self): 165 return 'MockJob' 166 self.god.stub_with(monitor_db, 'Job', MockJob) 167 hqe = monitor_db.HostQueueEntry( 168 new_record=True, 169 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None]) 170 hqe.save() 171 new_id = hqe.id 172 # Force a re-query and verify that the correct data was stored. 173 monitor_db.DBObject._clear_instance_cache() 174 hqe = monitor_db.HostQueueEntry(id=new_id) 175 self.assertEqual(hqe.id, new_id) 176 self.assertEqual(hqe.job_id, 1) 177 self.assertEqual(hqe.host_id, 2) 178 self.assertEqual(hqe.status, 'Queued') 179 self.assertEqual(hqe.meta_host, None) 180 self.assertEqual(hqe.active, False) 181 self.assertEqual(hqe.complete, False) 182 self.assertEqual(hqe.deleted, False) 183 self.assertEqual(hqe.execution_subdir, '.') 184 self.assertEqual(hqe.atomic_group_id, None) 185 self.assertEqual(hqe.started_on, None) 186 187 188class DispatcherSchedulingTest(BaseSchedulerTest): 189 _jobs_scheduled = [] 190 191 192 def tearDown(self): 193 super(DispatcherSchedulingTest, self).tearDown() 194 195 196 def _set_monitor_stubs(self): 197 super(DispatcherSchedulingTest, self)._set_monitor_stubs() 198 199 def job_run_stub(job_self, queue_entry): 200 """Return a dummy for testing. Called by HostQueueEntry.run().""" 201 self._record_job_scheduled(job_self.id, queue_entry.host.id) 202 queue_entry.set_status('Starting') 203 return DummyAgent() 204 205 self.god.stub_with(monitor_db.Job, 'run', job_run_stub) 206 207 def hqe_queue_log_record_stub(self, log_line): 208 """No-Op to avoid calls down to the _drone_manager during tests.""" 209 210 self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record', 211 hqe_queue_log_record_stub) 212 213 214 def _record_job_scheduled(self, job_id, host_id): 215 record = (job_id, host_id) 216 self.assert_(record not in self._jobs_scheduled, 217 'Job %d scheduled on host %d twice' % 218 (job_id, host_id)) 219 self._jobs_scheduled.append(record) 220 221 222 def _assert_job_scheduled_on(self, job_id, host_id): 223 record = (job_id, host_id) 224 self.assert_(record in self._jobs_scheduled, 225 'Job %d not scheduled on host %d as expected\n' 226 'Jobs scheduled: %s' % 227 (job_id, host_id, self._jobs_scheduled)) 228 self._jobs_scheduled.remove(record) 229 230 231 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): 232 """Assert job was scheduled on exactly number hosts out of a set.""" 233 found = [] 234 for host_id in host_ids: 235 record = (job_id, host_id) 236 if record in self._jobs_scheduled: 237 found.append(record) 238 self._jobs_scheduled.remove(record) 239 if len(found) < number: 240 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' 241 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 242 elif len(found) > number: 243 self.fail('Job %d scheduled on more than %d hosts in %s.\n' 244 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) 245 246 247 def _check_for_extra_schedulings(self): 248 if len(self._jobs_scheduled) != 0: 249 self.fail('Extra jobs scheduled: ' + 250 str(self._jobs_scheduled)) 251 252 253 def _convert_jobs_to_metahosts(self, *job_ids): 254 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')' 255 self._do_query('UPDATE host_queue_entries SET ' 256 'meta_host=host_id, host_id=NULL ' 257 'WHERE job_id IN ' + sql_tuple) 258 259 260 def _lock_host(self, host_id): 261 self._do_query('UPDATE hosts SET locked=1 WHERE id=' + 262 str(host_id)) 263 264 265 def setUp(self): 266 super(DispatcherSchedulingTest, self).setUp() 267 self._jobs_scheduled = [] 268 269 270 def _test_basic_scheduling_helper(self, use_metahosts): 271 'Basic nonmetahost scheduling' 272 self._create_job_simple([1], use_metahosts) 273 self._create_job_simple([2], use_metahosts) 274 self._dispatcher._schedule_new_jobs() 275 self._assert_job_scheduled_on(1, 1) 276 self._assert_job_scheduled_on(2, 2) 277 self._check_for_extra_schedulings() 278 279 280 def _test_priorities_helper(self, use_metahosts): 281 'Test prioritization ordering' 282 self._create_job_simple([1], use_metahosts) 283 self._create_job_simple([2], use_metahosts) 284 self._create_job_simple([1,2], use_metahosts) 285 self._create_job_simple([1], use_metahosts, priority=1) 286 self._dispatcher._schedule_new_jobs() 287 self._assert_job_scheduled_on(4, 1) # higher priority 288 self._assert_job_scheduled_on(2, 2) # earlier job over later 289 self._check_for_extra_schedulings() 290 291 292 def _test_hosts_ready_helper(self, use_metahosts): 293 """ 294 Only hosts that are status=Ready, unlocked and not invalid get 295 scheduled. 296 """ 297 self._create_job_simple([1], use_metahosts) 298 self._do_query('UPDATE hosts SET status="Running" WHERE id=1') 299 self._dispatcher._schedule_new_jobs() 300 self._check_for_extra_schedulings() 301 302 self._do_query('UPDATE hosts SET status="Ready", locked=1 ' 303 'WHERE id=1') 304 self._dispatcher._schedule_new_jobs() 305 self._check_for_extra_schedulings() 306 307 self._do_query('UPDATE hosts SET locked=0, invalid=1 ' 308 'WHERE id=1') 309 self._dispatcher._schedule_new_jobs() 310 if not use_metahosts: 311 self._assert_job_scheduled_on(1, 1) 312 self._check_for_extra_schedulings() 313 314 315 def _test_hosts_idle_helper(self, use_metahosts): 316 'Only idle hosts get scheduled' 317 self._create_job(hosts=[1], active=True) 318 self._create_job_simple([1], use_metahosts) 319 self._dispatcher._schedule_new_jobs() 320 self._check_for_extra_schedulings() 321 322 323 def _test_obey_ACLs_helper(self, use_metahosts): 324 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1') 325 self._create_job_simple([1], use_metahosts) 326 self._dispatcher._schedule_new_jobs() 327 self._check_for_extra_schedulings() 328 329 330 def test_basic_scheduling(self): 331 self._test_basic_scheduling_helper(False) 332 333 334 def test_priorities(self): 335 self._test_priorities_helper(False) 336 337 338 def test_hosts_ready(self): 339 self._test_hosts_ready_helper(False) 340 341 342 def test_hosts_idle(self): 343 self._test_hosts_idle_helper(False) 344 345 346 def test_obey_ACLs(self): 347 self._test_obey_ACLs_helper(False) 348 349 350 def test_non_metahost_on_invalid_host(self): 351 """ 352 Non-metahost entries can get scheduled on invalid hosts (this is how 353 one-time hosts work). 354 """ 355 self._do_query('UPDATE hosts SET invalid=1') 356 self._test_basic_scheduling_helper(False) 357 358 359 def test_metahost_scheduling(self): 360 """ 361 Basic metahost scheduling 362 """ 363 self._test_basic_scheduling_helper(True) 364 365 366 def test_metahost_priorities(self): 367 self._test_priorities_helper(True) 368 369 370 def test_metahost_hosts_ready(self): 371 self._test_hosts_ready_helper(True) 372 373 374 def test_metahost_hosts_idle(self): 375 self._test_hosts_idle_helper(True) 376 377 378 def test_metahost_obey_ACLs(self): 379 self._test_obey_ACLs_helper(True) 380 381 382 def _setup_test_only_if_needed_labels(self): 383 # apply only_if_needed label3 to host1 384 models.Host.smart_get('host1').labels.add(self.label3) 385 return self._create_job_simple([1], use_metahost=True) 386 387 388 def test_only_if_needed_labels_avoids_host(self): 389 job = self._setup_test_only_if_needed_labels() 390 # if the job doesn't depend on label3, there should be no scheduling 391 self._dispatcher._schedule_new_jobs() 392 self._check_for_extra_schedulings() 393 394 395 def test_only_if_needed_labels_schedules(self): 396 job = self._setup_test_only_if_needed_labels() 397 job.dependency_labels.add(self.label3) 398 self._dispatcher._schedule_new_jobs() 399 self._assert_job_scheduled_on(1, 1) 400 self._check_for_extra_schedulings() 401 402 403 def test_only_if_needed_labels_via_metahost(self): 404 job = self._setup_test_only_if_needed_labels() 405 job.dependency_labels.add(self.label3) 406 # should also work if the metahost is the only_if_needed label 407 self._do_query('DELETE FROM jobs_dependency_labels') 408 self._create_job(metahosts=[3]) 409 self._dispatcher._schedule_new_jobs() 410 self._assert_job_scheduled_on(2, 1) 411 self._check_for_extra_schedulings() 412 413 414 def test_nonmetahost_over_metahost(self): 415 """ 416 Non-metahost entries should take priority over metahost entries 417 for the same host 418 """ 419 self._create_job(metahosts=[1]) 420 self._create_job(hosts=[1]) 421 self._dispatcher._schedule_new_jobs() 422 self._assert_job_scheduled_on(2, 1) 423 self._check_for_extra_schedulings() 424 425 426 def test_metahosts_obey_blocks(self): 427 """ 428 Metahosts can't get scheduled on hosts already scheduled for 429 that job. 430 """ 431 self._create_job(metahosts=[1], hosts=[1]) 432 # make the nonmetahost entry complete, so the metahost can try 433 # to get scheduled 434 self._update_hqe(set='complete = 1', where='host_id=1') 435 self._dispatcher._schedule_new_jobs() 436 self._check_for_extra_schedulings() 437 438 439 # TODO(gps): These should probably live in their own TestCase class 440 # specific to testing HostScheduler methods directly. It was convenient 441 # to put it here for now to share existing test environment setup code. 442 def test_HostScheduler_check_atomic_group_labels(self): 443 normal_job = self._create_job(metahosts=[0]) 444 atomic_job = self._create_job(atomic_group=1) 445 # Indirectly initialize the internal state of the host scheduler. 446 self._dispatcher._refresh_pending_queue_entries() 447 448 atomic_hqe = monitor_db.HostQueueEntry(id=atomic_job.id) 449 normal_hqe = monitor_db.HostQueueEntry(id=normal_job.id) 450 451 host_scheduler = self._dispatcher._host_scheduler 452 self.assertTrue(host_scheduler._check_atomic_group_labels( 453 [self.label4.id], atomic_hqe)) 454 self.assertFalse(host_scheduler._check_atomic_group_labels( 455 [self.label4.id], normal_hqe)) 456 self.assertFalse(host_scheduler._check_atomic_group_labels( 457 [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) 458 self.assertTrue(host_scheduler._check_atomic_group_labels( 459 [self.label4.id, self.label6.id], atomic_hqe)) 460 self.assertRaises(monitor_db.SchedulerError, 461 host_scheduler._check_atomic_group_labels, 462 [self.label4.id, self.label5.id], 463 atomic_hqe) 464 465 466 def test_HostScheduler_get_host_atomic_group_id(self): 467 self._create_job(metahosts=[self.label6.id]) 468 # Indirectly initialize the internal state of the host scheduler. 469 self._dispatcher._refresh_pending_queue_entries() 470 471 # Test the host scheduler 472 host_scheduler = self._dispatcher._host_scheduler 473 self.assertRaises(monitor_db.SchedulerError, 474 host_scheduler._get_host_atomic_group_id, 475 [self.label4.id, self.label5.id]) 476 self.assertEqual(None, host_scheduler._get_host_atomic_group_id([])) 477 self.assertEqual(None, host_scheduler._get_host_atomic_group_id( 478 [self.label3.id, self.label7.id, self.label6.id])) 479 self.assertEqual(1, host_scheduler._get_host_atomic_group_id( 480 [self.label4.id, self.label7.id, self.label6.id])) 481 self.assertEqual(1, host_scheduler._get_host_atomic_group_id( 482 [self.label7.id, self.label5.id])) 483 484 485 def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): 486 # Create a job scheduled to run on label6. 487 self._create_job(metahosts=[self.label6.id]) 488 self._dispatcher._schedule_new_jobs() 489 # label6 only has hosts that are in atomic groups associated with it, 490 # there should be no scheduling. 491 self._check_for_extra_schedulings() 492 493 494 def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): 495 # Create a job scheduled to run on label5. This is an atomic group 496 # label but this job does not request atomic group scheduling. 497 self._create_job(metahosts=[self.label5.id]) 498 self._dispatcher._schedule_new_jobs() 499 # label6 only has hosts that are in atomic groups associated with it, 500 # there should be no scheduling. 501 self._check_for_extra_schedulings() 502 503 504 def test_atomic_group_scheduling_basics(self): 505 # Create jobs scheduled to run on an atomic group. 506 job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], 507 atomic_group=1) 508 job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], 509 atomic_group=1) 510 self._dispatcher._schedule_new_jobs() 511 # atomic_group.max_number_of_machines was 2 so we should run on 2. 512 self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) 513 self._assert_job_scheduled_on(job_b.id, 8) # label5 514 self._assert_job_scheduled_on(job_b.id, 9) # label5 515 self._check_for_extra_schedulings() 516 517 # The three host label4 atomic group still has one host available. 518 # That means a job with a synch_count of 1 asking to be scheduled on 519 # the atomic group can still use the final machine. 520 # 521 # This may seem like a somewhat odd use case. It allows the use of an 522 # atomic group as a set of machines to run smaller jobs within (a set 523 # of hosts configured for use in network tests with eachother perhaps?) 524 onehost_job = self._create_job(atomic_group=1) 525 self._dispatcher._schedule_new_jobs() 526 self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) 527 self._check_for_extra_schedulings() 528 529 # No more atomic groups have hosts available, no more jobs should 530 # be scheduled. 531 self._create_job(atomic_group=1) 532 self._dispatcher._schedule_new_jobs() 533 self._check_for_extra_schedulings() 534 535 536 def test_atomic_group_scheduling_obeys_acls(self): 537 # Request scheduling on a specific atomic label but be denied by ACLs. 538 self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)') 539 job = self._create_job(metahosts=[self.label5.id], atomic_group=1) 540 self._dispatcher._schedule_new_jobs() 541 self._check_for_extra_schedulings() 542 543 544 def test_atomic_group_scheduling_dependency_label_exclude(self): 545 # A dependency label that matches no hosts in the atomic group. 546 job_a = self._create_job(atomic_group=1) 547 job_a.dependency_labels.add(self.label3) 548 self._dispatcher._schedule_new_jobs() 549 self._check_for_extra_schedulings() 550 551 552 def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): 553 # A metahost and dependency label that excludes too many hosts. 554 job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], 555 atomic_group=1) 556 job_b.dependency_labels.add(self.label7) 557 self._dispatcher._schedule_new_jobs() 558 self._check_for_extra_schedulings() 559 560 561 def test_atomic_group_scheduling_dependency_label_match(self): 562 # A dependency label that exists on enough atomic group hosts in only 563 # one of the two atomic group labels. 564 job_c = self._create_job(synchronous=True, atomic_group=1) 565 job_c.dependency_labels.add(self.label7) 566 self._dispatcher._schedule_new_jobs() 567 self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) 568 self._check_for_extra_schedulings() 569 570 571 def test_atomic_group_scheduling_no_metahost(self): 572 # Force it to schedule on the other group for a reliable test. 573 self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') 574 # An atomic job without a metahost. 575 job = self._create_job(synchronous=True, atomic_group=1) 576 self._dispatcher._schedule_new_jobs() 577 self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) 578 self._check_for_extra_schedulings() 579 580 581 def test_atomic_group_scheduling_partial_group(self): 582 # Make one host in labels[3] unavailable so that there are only two 583 # hosts left in the group. 584 self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5') 585 job = self._create_job(synchronous=True, metahosts=[self.label4.id], 586 atomic_group=1) 587 self._dispatcher._schedule_new_jobs() 588 # Verify that it was scheduled on the 2 ready hosts in that group. 589 self._assert_job_scheduled_on(job.id, 6) 590 self._assert_job_scheduled_on(job.id, 7) 591 self._check_for_extra_schedulings() 592 593 594 def test_atomic_group_scheduling_not_enough_available(self): 595 # Mark some hosts in each atomic group label as not usable. 596 # One host running, another invalid in the first group label. 597 self._do_query('UPDATE hosts SET status="Running" WHERE id=5') 598 self._do_query('UPDATE hosts SET invalid=1 WHERE id=6') 599 # One host invalid in the second group label. 600 self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') 601 # Nothing to schedule when no group label has enough (2) good hosts.. 602 self._create_job(atomic_group=1, synchronous=True) 603 self._dispatcher._schedule_new_jobs() 604 # There are not enough hosts in either atomic group, 605 # No more scheduling should occur. 606 self._check_for_extra_schedulings() 607 608 # Now create an atomic job that has a synch count of 1. It should 609 # schedule on exactly one of the hosts. 610 onehost_job = self._create_job(atomic_group=1) 611 self._dispatcher._schedule_new_jobs() 612 self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) 613 614 615 def test_atomic_group_scheduling_no_valid_hosts(self): 616 self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)') 617 self._create_job(synchronous=True, metahosts=[self.label5.id], 618 atomic_group=1) 619 self._dispatcher._schedule_new_jobs() 620 # no hosts in the selected group and label are valid. no schedulings. 621 self._check_for_extra_schedulings() 622 623 624 def test_atomic_group_scheduling_metahost_works(self): 625 # Test that atomic group scheduling also obeys metahosts. 626 self._create_job(metahosts=[0], atomic_group=1) 627 self._dispatcher._schedule_new_jobs() 628 # There are no atomic group hosts that also have that metahost. 629 self._check_for_extra_schedulings() 630 631 job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) 632 self._dispatcher._schedule_new_jobs() 633 self._assert_job_scheduled_on(job_b.id, 8) 634 self._assert_job_scheduled_on(job_b.id, 9) 635 self._check_for_extra_schedulings() 636 637 638 def test_atomic_group_skips_ineligible_hosts(self): 639 # Test hosts marked ineligible for this job are not eligible. 640 # How would this ever happen anyways? 641 job = self._create_job(metahosts=[self.label4.id], atomic_group=1) 642 models.IneligibleHostQueue.objects.create(job=job, host_id=5) 643 models.IneligibleHostQueue.objects.create(job=job, host_id=6) 644 models.IneligibleHostQueue.objects.create(job=job, host_id=7) 645 self._dispatcher._schedule_new_jobs() 646 # No scheduling should occur as all desired hosts were ineligible. 647 self._check_for_extra_schedulings() 648 649 650 def test_atomic_group_scheduling_fail(self): 651 # If synch_count is > the atomic group number of machines, the job 652 # should be aborted immediately. 653 model_job = self._create_job(synchronous=True, atomic_group=1) 654 model_job.synch_count = 4 655 model_job.save() 656 job = monitor_db.Job(id=model_job.id) 657 self._dispatcher._schedule_new_jobs() 658 self._check_for_extra_schedulings() 659 queue_entries = job.get_host_queue_entries() 660 self.assertEqual(1, len(queue_entries)) 661 self.assertEqual(queue_entries[0].status, 662 models.HostQueueEntry.Status.ABORTED) 663 664 665 def test_atomic_group_no_labels_no_scheduling(self): 666 # Never schedule on atomic groups marked invalid. 667 job = self._create_job(metahosts=[self.label5.id], synchronous=True, 668 atomic_group=1) 669 # Deleting an atomic group via the frontend marks it invalid and 670 # removes all label references to the group. The job now references 671 # an invalid atomic group with no labels associated with it. 672 self.label5.atomic_group.invalid = True 673 self.label5.atomic_group.save() 674 self.label5.atomic_group = None 675 self.label5.save() 676 677 self._dispatcher._schedule_new_jobs() 678 self._check_for_extra_schedulings() 679 680 681 def test_schedule_directly_on_atomic_group_host_fail(self): 682 # Scheduling a job directly on hosts in an atomic group must 683 # fail to avoid users inadvertently holding up the use of an 684 # entire atomic group by using the machines individually. 685 job = self._create_job(hosts=[5]) 686 self._dispatcher._schedule_new_jobs() 687 self._check_for_extra_schedulings() 688 689 690 def test_schedule_directly_on_atomic_group_host(self): 691 # Scheduling a job directly on one host in an atomic group will 692 # work when the atomic group is listed on the HQE in addition 693 # to the host (assuming the sync count is 1). 694 job = self._create_job(hosts=[5], atomic_group=1) 695 self._dispatcher._schedule_new_jobs() 696 self._assert_job_scheduled_on(job.id, 5) 697 self._check_for_extra_schedulings() 698 699 700 def test_schedule_directly_on_atomic_group_hosts_sync2(self): 701 job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) 702 self._dispatcher._schedule_new_jobs() 703 self._assert_job_scheduled_on(job.id, 5) 704 self._assert_job_scheduled_on(job.id, 8) 705 self._check_for_extra_schedulings() 706 707 708 def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): 709 job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) 710 self._dispatcher._schedule_new_jobs() 711 self._check_for_extra_schedulings() 712 713 714 def test_only_schedule_queued_entries(self): 715 self._create_job(metahosts=[1]) 716 self._update_hqe(set='active=1, host_id=2') 717 self._dispatcher._schedule_new_jobs() 718 self._check_for_extra_schedulings() 719 720 721 def test_no_ready_hosts(self): 722 self._create_job(hosts=[1]) 723 self._do_query('UPDATE hosts SET status="Repair Failed"') 724 self._dispatcher._schedule_new_jobs() 725 self._check_for_extra_schedulings() 726 727 728class DispatcherThrottlingTest(BaseSchedulerTest): 729 """ 730 Test that the dispatcher throttles: 731 * total number of running processes 732 * number of processes started per cycle 733 """ 734 _MAX_RUNNING = 3 735 _MAX_STARTED = 2 736 737 def setUp(self): 738 super(DispatcherThrottlingTest, self).setUp() 739 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING 740 scheduler_config.config.max_processes_started_per_cycle = ( 741 self._MAX_STARTED) 742 743 def fake_max_runnable_processes(fake_self): 744 running = sum(agent.num_processes 745 for agent in self._agents 746 if agent.is_running()) 747 return self._MAX_RUNNING - running 748 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', 749 fake_max_runnable_processes) 750 751 752 def _setup_some_agents(self, num_agents): 753 self._agents = [DummyAgent() for i in xrange(num_agents)] 754 self._dispatcher._agents = list(self._agents) 755 756 757 def _run_a_few_cycles(self): 758 for i in xrange(4): 759 self._dispatcher._handle_agents() 760 761 762 def _assert_agents_started(self, indexes, is_started=True): 763 for i in indexes: 764 self.assert_(self._agents[i].is_running() == is_started, 765 'Agent %d %sstarted' % 766 (i, is_started and 'not ' or '')) 767 768 769 def _assert_agents_not_started(self, indexes): 770 self._assert_agents_started(indexes, False) 771 772 773 def test_throttle_total(self): 774 self._setup_some_agents(4) 775 self._run_a_few_cycles() 776 self._assert_agents_started([0, 1, 2]) 777 self._assert_agents_not_started([3]) 778 779 780 def test_throttle_per_cycle(self): 781 self._setup_some_agents(3) 782 self._dispatcher._handle_agents() 783 self._assert_agents_started([0, 1]) 784 self._assert_agents_not_started([2]) 785 786 787 def test_throttle_with_synchronous(self): 788 self._setup_some_agents(2) 789 self._agents[0].num_processes = 3 790 self._run_a_few_cycles() 791 self._assert_agents_started([0]) 792 self._assert_agents_not_started([1]) 793 794 795 def test_large_agent_starvation(self): 796 """ 797 Ensure large agents don't get starved by lower-priority agents. 798 """ 799 self._setup_some_agents(3) 800 self._agents[1].num_processes = 3 801 self._run_a_few_cycles() 802 self._assert_agents_started([0]) 803 self._assert_agents_not_started([1, 2]) 804 805 self._agents[0].set_done(True) 806 self._run_a_few_cycles() 807 self._assert_agents_started([1]) 808 self._assert_agents_not_started([2]) 809 810 811 def test_zero_process_agent(self): 812 self._setup_some_agents(5) 813 self._agents[4].num_processes = 0 814 self._run_a_few_cycles() 815 self._assert_agents_started([0, 1, 2, 4]) 816 self._assert_agents_not_started([3]) 817 818 819class FindAbortTest(BaseSchedulerTest): 820 """ 821 Test the dispatcher abort functionality. 822 """ 823 def _check_host_agent(self, agent, host_id): 824 self.assert_(isinstance(agent, monitor_db.Agent)) 825 tasks = list(agent.queue.queue) 826 self.assertEquals(len(tasks), 2) 827 cleanup, verify = tasks 828 829 self.assert_(isinstance(cleanup, monitor_db.CleanupTask)) 830 self.assertEquals(cleanup.host.id, host_id) 831 832 self.assert_(isinstance(verify, monitor_db.VerifyTask)) 833 self.assertEquals(verify.host.id, host_id) 834 835 836 def _check_agents(self, agents): 837 agents = list(agents) 838 self.assertEquals(len(agents), 3) 839 self.assertEquals(agents[0], self._agent) 840 self._check_host_agent(agents[1], 1) 841 self._check_host_agent(agents[2], 2) 842 843 844 def _common_setup(self): 845 self._create_job(hosts=[1, 2]) 846 self._update_hqe(set='aborted=1') 847 self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent') 848 _set_host_and_qe_ids(self._agent, [1, 2]) 849 self._agent.abort.expect_call() 850 self._agent.abort.expect_call() # gets called once for each HQE 851 self._dispatcher.add_agent(self._agent) 852 853 854 def test_find_aborting(self): 855 self._common_setup() 856 self._dispatcher._find_aborting() 857 self.god.check_playback() 858 859 860 def test_find_aborting_verifying(self): 861 self._common_setup() 862 self._update_hqe(set='active=1, status="Verifying"') 863 864 self._dispatcher._find_aborting() 865 866 self._check_agents(self._dispatcher._agents) 867 self.god.check_playback() 868 869 870class JobTimeoutTest(BaseSchedulerTest): 871 def _test_synch_start_timeout_helper(self, expect_abort, 872 set_created_on=True, set_active=True, 873 set_acl=True): 874 scheduler_config.config.synch_job_start_timeout_minutes = 60 875 job = self._create_job(hosts=[1, 2]) 876 if set_active: 877 hqe = job.hostqueueentry_set.filter(host__id=1)[0] 878 hqe.status = 'Pending' 879 hqe.active = 1 880 hqe.save() 881 882 everyone_acl = models.AclGroup.smart_get('Everyone') 883 host1 = models.Host.smart_get(1) 884 if set_acl: 885 everyone_acl.hosts.add(host1) 886 else: 887 everyone_acl.hosts.remove(host1) 888 889 job.created_on = datetime.datetime.now() 890 if set_created_on: 891 job.created_on -= datetime.timedelta(minutes=100) 892 job.save() 893 894 cleanup = self._dispatcher._periodic_cleanup 895 cleanup._abort_jobs_past_synch_start_timeout() 896 897 for hqe in job.hostqueueentry_set.all(): 898 self.assertEquals(hqe.aborted, expect_abort) 899 900 901 def test_synch_start_timeout_helper(self): 902 # no abort if any of the condition aren't met 903 self._test_synch_start_timeout_helper(False, set_created_on=False) 904 self._test_synch_start_timeout_helper(False, set_active=False) 905 self._test_synch_start_timeout_helper(False, set_acl=False) 906 # abort if all conditions are met 907 self._test_synch_start_timeout_helper(True) 908 909 910class PidfileRunMonitorTest(unittest.TestCase): 911 execution_tag = 'test_tag' 912 pid = 12345 913 process = drone_manager.Process('myhost', pid) 914 num_tests_failed = 1 915 916 def setUp(self): 917 self.god = mock.mock_god() 918 self.mock_drone_manager = self.god.create_mock_class( 919 drone_manager.DroneManager, 'drone_manager') 920 self.god.stub_with(monitor_db, '_drone_manager', 921 self.mock_drone_manager) 922 self.god.stub_function(email_manager.manager, 'enqueue_notify_email') 923 924 self.pidfile_id = object() 925 926 (self.mock_drone_manager.get_pidfile_id_from 927 .expect_call(self.execution_tag, 928 pidfile_name=monitor_db._AUTOSERV_PID_FILE) 929 .and_return(self.pidfile_id)) 930 self.mock_drone_manager.register_pidfile.expect_call(self.pidfile_id) 931 932 self.monitor = monitor_db.PidfileRunMonitor() 933 self.monitor.attach_to_existing_process(self.execution_tag) 934 935 936 def tearDown(self): 937 self.god.unstub_all() 938 939 940 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None, 941 use_second_read=False): 942 contents = drone_manager.PidfileContents() 943 if pid is not None: 944 contents.process = drone_manager.Process('myhost', pid) 945 contents.exit_status = exit_code 946 contents.num_tests_failed = tests_failed 947 self.mock_drone_manager.get_pidfile_contents.expect_call( 948 self.pidfile_id, use_second_read=use_second_read).and_return( 949 contents) 950 951 952 def set_not_yet_run(self): 953 self.setup_pidfile() 954 955 956 def set_empty_pidfile(self): 957 self.setup_pidfile() 958 959 960 def set_running(self, use_second_read=False): 961 self.setup_pidfile(self.pid, use_second_read=use_second_read) 962 963 964 def set_complete(self, error_code, use_second_read=False): 965 self.setup_pidfile(self.pid, error_code, self.num_tests_failed, 966 use_second_read=use_second_read) 967 968 969 def _check_monitor(self, expected_pid, expected_exit_status, 970 expected_num_tests_failed): 971 if expected_pid is None: 972 self.assertEquals(self.monitor._state.process, None) 973 else: 974 self.assertEquals(self.monitor._state.process.pid, expected_pid) 975 self.assertEquals(self.monitor._state.exit_status, expected_exit_status) 976 self.assertEquals(self.monitor._state.num_tests_failed, 977 expected_num_tests_failed) 978 979 980 self.god.check_playback() 981 982 983 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status, 984 expected_num_tests_failed): 985 self.monitor._read_pidfile() 986 self._check_monitor(expected_pid, expected_exit_status, 987 expected_num_tests_failed) 988 989 990 def _get_expected_tests_failed(self, expected_exit_status): 991 if expected_exit_status is None: 992 expected_tests_failed = None 993 else: 994 expected_tests_failed = self.num_tests_failed 995 return expected_tests_failed 996 997 998 def test_read_pidfile(self): 999 self.set_not_yet_run() 1000 self._test_read_pidfile_helper(None, None, None) 1001 1002 self.set_empty_pidfile() 1003 self._test_read_pidfile_helper(None, None, None) 1004 1005 self.set_running() 1006 self._test_read_pidfile_helper(self.pid, None, None) 1007 1008 self.set_complete(123) 1009 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed) 1010 1011 1012 def test_read_pidfile_error(self): 1013 self.mock_drone_manager.get_pidfile_contents.expect_call( 1014 self.pidfile_id, use_second_read=False).and_return( 1015 drone_manager.InvalidPidfile('error')) 1016 self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException, 1017 self.monitor._read_pidfile) 1018 self.god.check_playback() 1019 1020 1021 def setup_is_running(self, is_running): 1022 self.mock_drone_manager.is_process_running.expect_call( 1023 self.process).and_return(is_running) 1024 1025 1026 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status, 1027 expected_num_tests_failed): 1028 self.monitor._get_pidfile_info() 1029 self._check_monitor(expected_pid, expected_exit_status, 1030 expected_num_tests_failed) 1031 1032 1033 def test_get_pidfile_info(self): 1034 """ 1035 normal cases for get_pidfile_info 1036 """ 1037 # running 1038 self.set_running() 1039 self.setup_is_running(True) 1040 self._test_get_pidfile_info_helper(self.pid, None, None) 1041 1042 # exited during check 1043 self.set_running() 1044 self.setup_is_running(False) 1045 self.set_complete(123, use_second_read=True) # pidfile gets read again 1046 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 1047 1048 # completed 1049 self.set_complete(123) 1050 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed) 1051 1052 1053 def test_get_pidfile_info_running_no_proc(self): 1054 """ 1055 pidfile shows process running, but no proc exists 1056 """ 1057 # running but no proc 1058 self.set_running() 1059 self.setup_is_running(False) 1060 self.set_running(use_second_read=True) 1061 email_manager.manager.enqueue_notify_email.expect_call( 1062 mock.is_string_comparator(), mock.is_string_comparator()) 1063 self._test_get_pidfile_info_helper(self.pid, 1, 0) 1064 self.assertTrue(self.monitor.lost_process) 1065 1066 1067 def test_get_pidfile_info_not_yet_run(self): 1068 """ 1069 pidfile hasn't been written yet 1070 """ 1071 self.set_not_yet_run() 1072 self._test_get_pidfile_info_helper(None, None, None) 1073 1074 1075 def test_process_failed_to_write_pidfile(self): 1076 self.set_not_yet_run() 1077 email_manager.manager.enqueue_notify_email.expect_call( 1078 mock.is_string_comparator(), mock.is_string_comparator()) 1079 self.monitor._start_time = time.time() - monitor_db.PIDFILE_TIMEOUT - 1 1080 self._test_get_pidfile_info_helper(None, 1, 0) 1081 self.assertTrue(self.monitor.lost_process) 1082 1083 1084class AgentTest(unittest.TestCase): 1085 def setUp(self): 1086 self.god = mock.mock_god() 1087 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 1088 'dispatcher') 1089 1090 1091 def tearDown(self): 1092 self.god.unstub_all() 1093 1094 1095 def _create_mock_task(self, name): 1096 task = self.god.create_mock_class(monitor_db.AgentTask, name) 1097 _set_host_and_qe_ids(task) 1098 return task 1099 1100 def _create_agent(self, tasks): 1101 agent = monitor_db.Agent(tasks) 1102 agent.dispatcher = self._dispatcher 1103 return agent 1104 1105 1106 def _finish_agent(self, agent): 1107 while not agent.is_done(): 1108 agent.tick() 1109 1110 1111 def test_agent(self): 1112 task1 = self._create_mock_task('task1') 1113 task2 = self._create_mock_task('task2') 1114 task3 = self._create_mock_task('task3') 1115 task1.poll.expect_call() 1116 task1.is_done.expect_call().and_return(False) 1117 task1.poll.expect_call() 1118 task1.is_done.expect_call().and_return(True) 1119 task1.is_done.expect_call().and_return(True) 1120 task1.success = True 1121 1122 task2.poll.expect_call() 1123 task2.is_done.expect_call().and_return(True) 1124 task2.is_done.expect_call().and_return(True) 1125 task2.success = False 1126 task2.failure_tasks = [task3] 1127 1128 self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3)) 1129 1130 agent = self._create_agent([task1, task2]) 1131 self._finish_agent(agent) 1132 self.god.check_playback() 1133 1134 1135 def _test_agent_abort_helper(self, ignore_abort=False): 1136 task1 = self._create_mock_task('task1') 1137 task2 = self._create_mock_task('task2') 1138 task1.poll.expect_call() 1139 task1.is_done.expect_call().and_return(False) 1140 task1.abort.expect_call() 1141 if ignore_abort: 1142 task1.aborted = False # task ignores abort; execution continues 1143 1144 task1.poll.expect_call() 1145 task1.is_done.expect_call().and_return(True) 1146 task1.is_done.expect_call().and_return(True) 1147 task1.success = True 1148 1149 task2.poll.expect_call() 1150 task2.is_done.expect_call().and_return(True) 1151 task2.is_done.expect_call().and_return(True) 1152 task2.success = True 1153 else: 1154 task1.aborted = True 1155 task2.abort.expect_call() 1156 task2.aborted = True 1157 1158 agent = self._create_agent([task1, task2]) 1159 agent.tick() 1160 agent.abort() 1161 self._finish_agent(agent) 1162 self.god.check_playback() 1163 1164 1165 def test_agent_abort(self): 1166 self._test_agent_abort_helper() 1167 self._test_agent_abort_helper(True) 1168 1169 1170 def _test_agent_abort_before_started_helper(self, ignore_abort=False): 1171 task = self._create_mock_task('task') 1172 task.abort.expect_call() 1173 if ignore_abort: 1174 task.aborted = False 1175 task.poll.expect_call() 1176 task.is_done.expect_call().and_return(True) 1177 task.is_done.expect_call().and_return(True) 1178 task.success = True 1179 else: 1180 task.aborted = True 1181 1182 agent = self._create_agent([task]) 1183 agent.abort() 1184 self._finish_agent(agent) 1185 self.god.check_playback() 1186 1187 1188 def test_agent_abort_before_started(self): 1189 self._test_agent_abort_before_started_helper() 1190 self._test_agent_abort_before_started_helper(True) 1191 1192 1193class AgentTasksTest(unittest.TestCase): 1194 TEMP_DIR = '/abspath/tempdir' 1195 RESULTS_DIR = '/results/dir' 1196 HOSTNAME = 'myhost' 1197 DUMMY_PROCESS = object() 1198 HOST_PROTECTION = host_protections.default 1199 PIDFILE_ID = object() 1200 JOB_OWNER = 'test_owner' 1201 JOB_NAME = 'test_job_name' 1202 JOB_AUTOSERV_PARAMS = set(['-u', JOB_OWNER, '-l', JOB_NAME]) 1203 1204 def setUp(self): 1205 self.god = mock.mock_god() 1206 self.god.stub_with(drone_manager.DroneManager, 'get_temporary_path', 1207 mock.mock_function('get_temporary_path', 1208 default_return_val='tempdir')) 1209 self.god.stub_function(drone_manager.DroneManager, 1210 'copy_results_on_drone') 1211 self.god.stub_function(drone_manager.DroneManager, 1212 'copy_to_results_repository') 1213 self.god.stub_function(drone_manager.DroneManager, 1214 'get_pidfile_id_from') 1215 1216 def dummy_absolute_path(self, path): 1217 return '/abspath/' + path 1218 self.god.stub_with(drone_manager.DroneManager, 'absolute_path', 1219 dummy_absolute_path) 1220 1221 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'run') 1222 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'exit_code') 1223 self.god.stub_class_method(monitor_db.PidfileRunMonitor, 'get_process') 1224 def mock_has_process(unused): 1225 return True 1226 self.god.stub_with(monitor_db.PidfileRunMonitor, 'has_process', 1227 mock_has_process) 1228 self.host = self.god.create_mock_class(monitor_db.Host, 'host') 1229 self.host.id = 1 1230 self.host.hostname = self.HOSTNAME 1231 self.host.protection = self.HOST_PROTECTION 1232 self.queue_entry = self.god.create_mock_class( 1233 monitor_db.HostQueueEntry, 'queue_entry') 1234 self.job = self.god.create_mock_class(monitor_db.Job, 'job') 1235 self.job.owner = self.JOB_OWNER 1236 self.job.name = self.JOB_NAME 1237 self.queue_entry.id = 1 1238 self.queue_entry.job = self.job 1239 self.queue_entry.host = self.host 1240 self.queue_entry.meta_host = None 1241 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 1242 'dispatcher') 1243 1244 1245 def tearDown(self): 1246 self.god.unstub_all() 1247 1248 1249 def run_task(self, task, success): 1250 """ 1251 Do essentially what an Agent would do, but protect againt 1252 infinite looping from test errors. 1253 """ 1254 if not getattr(task, 'agent', None): 1255 task.agent = object() 1256 count = 0 1257 while not task.is_done(): 1258 count += 1 1259 if count > 10: 1260 print 'Task failed to finish' 1261 # in case the playback has clues to why it 1262 # failed 1263 self.god.check_playback() 1264 self.fail() 1265 task.poll() 1266 self.assertEquals(task.success, success) 1267 1268 1269 def setup_run_monitor(self, exit_status, copy_log_file=True): 1270 monitor_db.PidfileRunMonitor.run.expect_call( 1271 mock.is_instance_comparator(list), 1272 'tempdir', 1273 nice_level=monitor_db.AUTOSERV_NICE_LEVEL, 1274 log_file=mock.anything_comparator(), 1275 pidfile_name=monitor_db._AUTOSERV_PID_FILE, 1276 paired_with_pidfile=None) 1277 monitor_db.PidfileRunMonitor.exit_code.expect_call() 1278 monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return( 1279 exit_status) 1280 1281 if copy_log_file: 1282 self._setup_move_logfile() 1283 1284 1285 def _setup_move_logfile(self, copy_on_drone=False, 1286 include_destination=False): 1287 monitor_db.PidfileRunMonitor.get_process.expect_call().and_return( 1288 self.DUMMY_PROCESS) 1289 if copy_on_drone: 1290 self.queue_entry.execution_tag.expect_call().and_return('tag') 1291 drone_manager.DroneManager.copy_results_on_drone.expect_call( 1292 self.DUMMY_PROCESS, source_path=mock.is_string_comparator(), 1293 destination_path=mock.is_string_comparator()) 1294 elif include_destination: 1295 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1296 self.DUMMY_PROCESS, mock.is_string_comparator(), 1297 destination_path=mock.is_string_comparator()) 1298 else: 1299 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1300 self.DUMMY_PROCESS, mock.is_string_comparator()) 1301 1302 1303 def _test_repair_task_helper(self, success): 1304 self.host.set_status.expect_call('Repairing') 1305 if success: 1306 self.setup_run_monitor(0) 1307 self.host.set_status.expect_call('Ready') 1308 else: 1309 self.setup_run_monitor(1) 1310 self.host.set_status.expect_call('Repair Failed') 1311 1312 task = monitor_db.RepairTask(self.host) 1313 self.assertEquals(task.failure_tasks, []) 1314 self.run_task(task, success) 1315 1316 expected_protection = host_protections.Protection.get_string( 1317 host_protections.default) 1318 expected_protection = host_protections.Protection.get_attr_name( 1319 expected_protection) 1320 1321 self.assertTrue(set(task.cmd) >= 1322 set([monitor_db._autoserv_path, '-p', '-R', '-m', 1323 self.HOSTNAME, '-r', self.TEMP_DIR, 1324 '--host-protection', expected_protection])) 1325 self.god.check_playback() 1326 1327 1328 def test_repair_task(self): 1329 self._test_repair_task_helper(True) 1330 self._test_repair_task_helper(False) 1331 1332 1333 def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair): 1334 self.god.stub_class(monitor_db, 'FinalReparseTask') 1335 self.god.stub_class(monitor_db, 'Agent') 1336 self.god.stub_class_method(monitor_db.TaskWithJobKeyvals, 1337 '_write_keyval_after_job') 1338 agent = DummyAgent() 1339 agent.dispatcher = self._dispatcher 1340 1341 self.host.set_status.expect_call('Repairing') 1342 self.queue_entry.requeue.expect_call() 1343 self.setup_run_monitor(1) 1344 self.host.set_status.expect_call('Repair Failed') 1345 self.queue_entry.update_from_database.expect_call() 1346 self.queue_entry.set_execution_subdir.expect_call() 1347 monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call( 1348 'job_queued', mock.is_instance_comparator(int)) 1349 monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call( 1350 'job_finished', mock.is_instance_comparator(int)) 1351 self._setup_move_logfile(copy_on_drone=True) 1352 self.queue_entry.execution_tag.expect_call().and_return('tag') 1353 self._setup_move_logfile() 1354 self.job.parse_failed_repair = parse_failed_repair 1355 if parse_failed_repair: 1356 reparse_task = monitor_db.FinalReparseTask.expect_new( 1357 [self.queue_entry]) 1358 reparse_agent = monitor_db.Agent.expect_new([reparse_task], 1359 num_processes=0) 1360 self._dispatcher.add_agent.expect_call(reparse_agent) 1361 self.queue_entry.handle_host_failure.expect_call() 1362 1363 task = monitor_db.RepairTask(self.host, self.queue_entry) 1364 task.agent = agent 1365 self.queue_entry.status = 'Queued' 1366 self.job.created_on = datetime.datetime(2009, 1, 1) 1367 self.run_task(task, False) 1368 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1369 self.god.check_playback() 1370 1371 1372 def test_repair_task_with_queue_entry(self): 1373 self._test_repair_task_with_queue_entry_helper(True) 1374 self._test_repair_task_with_queue_entry_helper(False) 1375 1376 1377 def setup_verify_expects(self, success, use_queue_entry): 1378 if use_queue_entry: 1379 self.queue_entry.set_status.expect_call('Verifying') 1380 self.host.set_status.expect_call('Verifying') 1381 if success: 1382 self.setup_run_monitor(0) 1383 self.host.set_status.expect_call('Ready') 1384 else: 1385 self.setup_run_monitor(1) 1386 if use_queue_entry and not self.queue_entry.meta_host: 1387 self.queue_entry.set_execution_subdir.expect_call() 1388 self.queue_entry.execution_tag.expect_call().and_return('tag') 1389 self._setup_move_logfile(include_destination=True) 1390 1391 1392 def _check_verify_failure_tasks(self, verify_task): 1393 self.assertEquals(len(verify_task.failure_tasks), 1) 1394 repair_task = verify_task.failure_tasks[0] 1395 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 1396 self.assertEquals(verify_task.host, repair_task.host) 1397 if verify_task.queue_entry: 1398 self.assertEquals(repair_task.queue_entry_to_fail, 1399 verify_task.queue_entry) 1400 else: 1401 self.assertEquals(repair_task.queue_entry_to_fail, None) 1402 1403 1404 def _test_verify_task_helper(self, success, use_queue_entry=False, 1405 use_meta_host=False): 1406 self.setup_verify_expects(success, use_queue_entry) 1407 1408 if use_queue_entry: 1409 task = monitor_db.VerifyTask(queue_entry=self.queue_entry) 1410 else: 1411 task = monitor_db.VerifyTask(host=self.host) 1412 self._check_verify_failure_tasks(task) 1413 self.run_task(task, success) 1414 self.assertTrue(set(task.cmd) >= 1415 set([monitor_db._autoserv_path, '-p', '-v', '-m', 1416 self.HOSTNAME, '-r', self.TEMP_DIR])) 1417 if use_queue_entry: 1418 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1419 self.god.check_playback() 1420 1421 1422 def test_verify_task_with_host(self): 1423 self._test_verify_task_helper(True) 1424 self._test_verify_task_helper(False) 1425 1426 1427 def test_verify_task_with_queue_entry(self): 1428 self._test_verify_task_helper(True, use_queue_entry=True) 1429 self._test_verify_task_helper(False, use_queue_entry=True) 1430 1431 1432 def test_verify_task_with_metahost(self): 1433 self.queue_entry.meta_host = 1 1434 self.test_verify_task_with_queue_entry() 1435 1436 1437 def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None, 1438 hqe_aborted=False): 1439 self.queue_entry.execution_tag.expect_call().and_return('tag') 1440 self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new() 1441 self.pidfile_monitor.pidfile_id = self.PIDFILE_ID 1442 self.pidfile_monitor.attach_to_existing_process.expect_call('tag') 1443 if autoserv_success: 1444 code = 0 1445 else: 1446 code = 1 1447 self.queue_entry.update_from_database.expect_call() 1448 self.queue_entry.aborted = hqe_aborted 1449 if not hqe_aborted: 1450 self.pidfile_monitor.exit_code.expect_call().and_return(code) 1451 1452 if hqe_status: 1453 self.queue_entry.set_status.expect_call(hqe_status) 1454 1455 1456 def _setup_pre_parse_expects(self, autoserv_success): 1457 self._setup_post_job_task_expects(autoserv_success, 'Parsing') 1458 1459 1460 def _setup_post_parse_expects(self, autoserv_success): 1461 if autoserv_success: 1462 status = 'Completed' 1463 else: 1464 status = 'Failed' 1465 self.queue_entry.set_status.expect_call(status) 1466 1467 1468 def _expect_execute_run_monitor(self): 1469 self.monitor.exit_code.expect_call() 1470 self.monitor.exit_code.expect_call().and_return(0) 1471 self._expect_copy_results() 1472 1473 1474 def _setup_post_job_run_monitor(self, pidfile_name): 1475 self.pidfile_monitor.has_process.expect_call().and_return(True) 1476 autoserv_pidfile_id = object() 1477 self.monitor = monitor_db.PidfileRunMonitor.expect_new() 1478 self.monitor.run.expect_call( 1479 mock.is_instance_comparator(list), 1480 'tag', 1481 nice_level=monitor_db.AUTOSERV_NICE_LEVEL, 1482 log_file=mock.anything_comparator(), 1483 pidfile_name=pidfile_name, 1484 paired_with_pidfile=self.PIDFILE_ID) 1485 self._expect_execute_run_monitor() 1486 1487 1488 def _expect_copy_results(self, monitor=None, queue_entry=None): 1489 if monitor is None: 1490 monitor = self.monitor 1491 monitor.has_process.expect_call().and_return(True) 1492 if queue_entry: 1493 queue_entry.execution_tag.expect_call().and_return('tag') 1494 monitor.get_process.expect_call().and_return(self.DUMMY_PROCESS) 1495 drone_manager.DroneManager.copy_to_results_repository.expect_call( 1496 self.DUMMY_PROCESS, mock.is_string_comparator()) 1497 1498 1499 def _test_final_reparse_task_helper(self, autoserv_success=True): 1500 self._setup_pre_parse_expects(autoserv_success) 1501 self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE) 1502 self._setup_post_parse_expects(autoserv_success) 1503 1504 task = monitor_db.FinalReparseTask([self.queue_entry]) 1505 self.run_task(task, True) 1506 1507 self.god.check_playback() 1508 cmd = [monitor_db._parser_path, '--write-pidfile', '-l', '2', '-r', 1509 '-o', '-P', '/abspath/tag'] 1510 self.assertEquals(task.cmd, cmd) 1511 1512 1513 def test_final_reparse_task(self): 1514 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1515 self._test_final_reparse_task_helper() 1516 self._test_final_reparse_task_helper(autoserv_success=False) 1517 1518 1519 def test_final_reparse_throttling(self): 1520 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1521 self.god.stub_function(monitor_db.FinalReparseTask, 1522 '_can_run_new_parse') 1523 1524 self._setup_pre_parse_expects(True) 1525 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1526 False) 1527 monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return( 1528 True) 1529 self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE) 1530 self._setup_post_parse_expects(True) 1531 1532 task = monitor_db.FinalReparseTask([self.queue_entry]) 1533 self.run_task(task, True) 1534 self.god.check_playback() 1535 1536 1537 def test_final_reparse_recovery(self): 1538 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1539 self.monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor, 1540 'run_monitor') 1541 self._setup_post_job_task_expects(True) 1542 self._expect_execute_run_monitor() 1543 self._setup_post_parse_expects(True) 1544 1545 task = monitor_db.FinalReparseTask([self.queue_entry], 1546 run_monitor=self.monitor) 1547 self.run_task(task, True) 1548 self.god.check_playback() 1549 1550 1551 def _setup_gather_logs_expects(self, autoserv_killed=True, 1552 hqe_aborted=False): 1553 self.god.stub_class(monitor_db, 'PidfileRunMonitor') 1554 self.god.stub_class(monitor_db, 'FinalReparseTask') 1555 self._setup_post_job_task_expects(not autoserv_killed, 'Gathering', 1556 hqe_aborted) 1557 if hqe_aborted: 1558 exit_code = None 1559 elif autoserv_killed: 1560 exit_code = 271 1561 else: 1562 exit_code = 0 1563 self.pidfile_monitor.exit_code.expect_call().and_return(exit_code) 1564 if exit_code != 0: 1565 self._setup_post_job_run_monitor('.collect_crashinfo_execute') 1566 self.pidfile_monitor.has_process.expect_call().and_return(True) 1567 self._expect_copy_results(monitor=self.pidfile_monitor, 1568 queue_entry=self.queue_entry) 1569 parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry]) 1570 _set_host_and_qe_ids(parse_task) 1571 self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task)) 1572 1573 1574 def _run_gather_logs_task(self): 1575 task = monitor_db.GatherLogsTask(self.job, [self.queue_entry]) 1576 task.agent = DummyAgent() 1577 task.agent.dispatcher = self._dispatcher 1578 self.run_task(task, True) 1579 self.god.check_playback() 1580 1581 1582 def test_gather_logs_task(self): 1583 self._setup_gather_logs_expects() 1584 # no rebooting for this basic test 1585 self.job.reboot_after = models.RebootAfter.NEVER 1586 self.host.set_status.expect_call('Ready') 1587 1588 self._run_gather_logs_task() 1589 1590 1591 def test_gather_logs_task_successful_autoserv(self): 1592 # When Autoserv exits successfully, no collect_crashinfo stage runs 1593 self._setup_gather_logs_expects(autoserv_killed=False) 1594 self.job.reboot_after = models.RebootAfter.NEVER 1595 self.host.set_status.expect_call('Ready') 1596 1597 self._run_gather_logs_task() 1598 1599 1600 def _setup_gather_task_cleanup_expects(self): 1601 self.god.stub_class(monitor_db, 'CleanupTask') 1602 cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host) 1603 _set_host_and_qe_ids(cleanup_task) 1604 self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task)) 1605 1606 1607 def test_gather_logs_reboot_hosts(self): 1608 self._setup_gather_logs_expects() 1609 self.job.reboot_after = models.RebootAfter.ALWAYS 1610 self._setup_gather_task_cleanup_expects() 1611 1612 self._run_gather_logs_task() 1613 1614 1615 def test_gather_logs_reboot_on_abort(self): 1616 self._setup_gather_logs_expects(hqe_aborted=True) 1617 self.job.reboot_after = models.RebootAfter.NEVER 1618 self._setup_gather_task_cleanup_expects() 1619 1620 self._run_gather_logs_task() 1621 1622 1623 def _test_cleanup_task_helper(self, success, use_queue_entry=False): 1624 if use_queue_entry: 1625 self.queue_entry.get_host.expect_call().and_return(self.host) 1626 self.host.set_status.expect_call('Cleaning') 1627 if success: 1628 self.setup_run_monitor(0) 1629 self.host.set_status.expect_call('Ready') 1630 self.host.update_field.expect_call('dirty', 0) 1631 else: 1632 self.setup_run_monitor(1) 1633 if use_queue_entry and not self.queue_entry.meta_host: 1634 self.queue_entry.set_execution_subdir.expect_call() 1635 self.queue_entry.execution_tag.expect_call().and_return('tag') 1636 self._setup_move_logfile(include_destination=True) 1637 1638 if use_queue_entry: 1639 task = monitor_db.CleanupTask(queue_entry=self.queue_entry) 1640 else: 1641 task = monitor_db.CleanupTask(host=self.host) 1642 self.assertEquals(len(task.failure_tasks), 1) 1643 repair_task = task.failure_tasks[0] 1644 self.assert_(isinstance(repair_task, monitor_db.RepairTask)) 1645 if use_queue_entry: 1646 self.assertEquals(repair_task.queue_entry_to_fail, self.queue_entry) 1647 1648 self.run_task(task, success) 1649 1650 self.god.check_playback() 1651 self.assert_(set(task.cmd) >= 1652 set([monitor_db._autoserv_path, '-p', '--cleanup', '-m', 1653 self.HOSTNAME, '-r', self.TEMP_DIR])) 1654 if use_queue_entry: 1655 self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) 1656 1657 def test_cleanup_task(self): 1658 self._test_cleanup_task_helper(True) 1659 self._test_cleanup_task_helper(False) 1660 1661 1662 def test_cleanup_task_with_queue_entry(self): 1663 self._test_cleanup_task_helper(False, True) 1664 1665 1666 def test_recovery_queue_task_aborted_early(self): 1667 # abort a RecoveryQueueTask right after it's created 1668 self.god.stub_class_method(monitor_db.QueueTask, '_log_abort') 1669 self.god.stub_class_method(monitor_db.QueueTask, '_finish_task') 1670 run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor, 1671 'run_monitor') 1672 1673 self.queue_entry.execution_tag.expect_call().and_return('tag') 1674 run_monitor.kill.expect_call() 1675 run_monitor.has_process.expect_call().and_return(True) 1676 monitor_db.QueueTask._log_abort.expect_call() 1677 monitor_db.QueueTask._finish_task.expect_call() 1678 1679 task = monitor_db.RecoveryQueueTask(self.job, [self.queue_entry], 1680 run_monitor) 1681 task.abort() 1682 self.assert_(task.aborted) 1683 self.god.check_playback() 1684 1685 1686class HostTest(BaseSchedulerTest): 1687 def test_cmp_for_sort(self): 1688 expected_order = [ 1689 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010', 1690 'host10', 'host11', 'yolkfolk'] 1691 hostname_idx = list(monitor_db.Host._fields).index('hostname') 1692 row = [None] * len(monitor_db.Host._fields) 1693 hosts = [] 1694 for hostname in expected_order: 1695 row[hostname_idx] = hostname 1696 hosts.append(monitor_db.Host(row=row, new_record=True)) 1697 1698 host1 = hosts[expected_order.index('Host1')] 1699 host010 = hosts[expected_order.index('HOST010')] 1700 host10 = hosts[expected_order.index('host10')] 1701 host3 = hosts[expected_order.index('host3')] 1702 alice = hosts[expected_order.index('alice')] 1703 self.assertEqual(0, monitor_db.Host.cmp_for_sort(host10, host10)) 1704 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host10, host010)) 1705 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host010, host10)) 1706 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host10)) 1707 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host010)) 1708 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host10)) 1709 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host010)) 1710 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, host1)) 1711 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host3)) 1712 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(alice, host3)) 1713 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, alice)) 1714 self.assertEqual(0, monitor_db.Host.cmp_for_sort(alice, alice)) 1715 1716 hosts.sort(cmp=monitor_db.Host.cmp_for_sort) 1717 self.assertEqual(expected_order, [h.hostname for h in hosts]) 1718 1719 hosts.reverse() 1720 hosts.sort(cmp=monitor_db.Host.cmp_for_sort) 1721 self.assertEqual(expected_order, [h.hostname for h in hosts]) 1722 1723 1724class HostQueueEntryTest(BaseSchedulerTest): 1725 def _create_hqe(self, dependency_labels=(), **create_job_kwargs): 1726 job = self._create_job(**create_job_kwargs) 1727 for label in dependency_labels: 1728 job.dependency_labels.add(label) 1729 hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id)) 1730 self.assertEqual(1, len(hqes)) 1731 return hqes[0] 1732 1733 def _check_hqe_labels(self, hqe, expected_labels): 1734 expected_labels = set(expected_labels) 1735 label_names = set(label.name for label in hqe.get_labels()) 1736 self.assertEqual(expected_labels, label_names) 1737 1738 def test_get_labels_empty(self): 1739 hqe = self._create_hqe(hosts=[1]) 1740 labels = list(hqe.get_labels()) 1741 self.assertEqual([], labels) 1742 1743 def test_get_labels_metahost(self): 1744 hqe = self._create_hqe(metahosts=[2]) 1745 self._check_hqe_labels(hqe, ['label2']) 1746 1747 def test_get_labels_dependancies(self): 1748 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4), 1749 metahosts=[1]) 1750 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4']) 1751 1752 1753class JobTest(BaseSchedulerTest): 1754 def setUp(self): 1755 super(JobTest, self).setUp() 1756 self.god.stub_with( 1757 drone_manager.DroneManager, 'attach_file_to_execution', 1758 mock.mock_function('attach_file_to_execution', 1759 default_return_val='/test/path/tmp/foo')) 1760 1761 1762 def _setup_directory_expects(self, execution_subdir): 1763 # XXX(gps): um... this function does -nothing- 1764 job_path = os.path.join('.', '1-my_user') 1765 results_dir = os.path.join(job_path, execution_subdir) 1766 1767 1768 def _test_run_helper(self, expect_agent=True, expect_starting=False, 1769 expect_pending=False): 1770 if expect_starting: 1771 expected_status = models.HostQueueEntry.Status.STARTING 1772 elif expect_pending: 1773 expected_status = models.HostQueueEntry.Status.PENDING 1774 else: 1775 expected_status = models.HostQueueEntry.Status.VERIFYING 1776 job = monitor_db.Job.fetch('id = 1').next() 1777 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() 1778 agent = job.run(queue_entry) 1779 1780 self.god.check_playback() 1781 self.assertEquals(models.HostQueueEntry.smart_get(1).status, 1782 expected_status) 1783 1784 if not expect_agent: 1785 self.assertEquals(agent, None) 1786 return 1787 1788 self.assert_(isinstance(agent, monitor_db.Agent)) 1789 tasks = list(agent.queue.queue) 1790 return tasks 1791 1792 1793 def _check_verify_task(self, verify_task): 1794 self.assert_(isinstance(verify_task, monitor_db.VerifyTask)) 1795 self.assertEquals(verify_task.queue_entry.id, 1) 1796 1797 1798 def _check_pending_task(self, pending_task): 1799 self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask)) 1800 self.assertEquals(pending_task._queue_entry.id, 1) 1801 1802 1803 def test_run_asynchronous(self): 1804 self._create_job(hosts=[1, 2]) 1805 1806 tasks = self._test_run_helper() 1807 1808 self.assertEquals(len(tasks), 2) 1809 verify_task, pending_task = tasks 1810 self._check_verify_task(verify_task) 1811 self._check_pending_task(pending_task) 1812 1813 1814 def test_run_asynchronous_skip_verify(self): 1815 job = self._create_job(hosts=[1, 2]) 1816 job.run_verify = False 1817 job.save() 1818 self._setup_directory_expects('host1') 1819 1820 tasks = self._test_run_helper() 1821 1822 self.assertEquals(len(tasks), 1) 1823 pending_task = tasks[0] 1824 self._check_pending_task(pending_task) 1825 1826 1827 def test_run_synchronous_verify(self): 1828 self._create_job(hosts=[1, 2], synchronous=True) 1829 1830 tasks = self._test_run_helper() 1831 self.assertEquals(len(tasks), 2) 1832 verify_task, pending_task = tasks 1833 self._check_verify_task(verify_task) 1834 self._check_pending_task(pending_task) 1835 1836 1837 def test_run_synchronous_skip_verify(self): 1838 job = self._create_job(hosts=[1, 2], synchronous=True) 1839 job.run_verify = False 1840 job.save() 1841 1842 tasks = self._test_run_helper() 1843 self.assertEquals(len(tasks), 1) 1844 self._check_pending_task(tasks[0]) 1845 1846 1847 def test_run_synchronous_ready(self): 1848 self._create_job(hosts=[1, 2], synchronous=True) 1849 self._update_hqe("status='Pending', execution_subdir=''") 1850 self._setup_directory_expects('group0') 1851 1852 tasks = self._test_run_helper(expect_starting=True) 1853 self.assertEquals(len(tasks), 1) 1854 queue_task = tasks[0] 1855 1856 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1857 self.assertEquals(queue_task.job.id, 1) 1858 hqe_ids = [hqe.id for hqe in queue_task.queue_entries] 1859 self.assertEquals(hqe_ids, [1, 2]) 1860 1861 1862 def test_run_synchronous_atomic_group_ready(self): 1863 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 1864 self._update_hqe("status='Pending', execution_subdir=''") 1865 1866 tasks = self._test_run_helper(expect_starting=True) 1867 self.assertEquals(len(tasks), 1) 1868 queue_task = tasks[0] 1869 1870 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1871 # Atomic group jobs that do not a specific label in the atomic group 1872 # will use the atomic group name as their group name. 1873 self.assertEquals(queue_task.group_name, 'atomic1') 1874 1875 1876 def test_run_synchronous_atomic_group_with_label_ready(self): 1877 job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) 1878 job.dependency_labels.add(self.label4) 1879 self._update_hqe("status='Pending', execution_subdir=''") 1880 1881 tasks = self._test_run_helper(expect_starting=True) 1882 self.assertEquals(len(tasks), 1) 1883 queue_task = tasks[0] 1884 1885 self.assert_(isinstance(queue_task, monitor_db.QueueTask)) 1886 # Atomic group jobs that also specify a label in the atomic group 1887 # will use the label name as their group name. 1888 self.assertEquals(queue_task.group_name, 'label4') 1889 1890 1891 def test_reboot_before_always(self): 1892 job = self._create_job(hosts=[1]) 1893 job.reboot_before = models.RebootBefore.ALWAYS 1894 job.save() 1895 1896 tasks = self._test_run_helper() 1897 self.assertEquals(len(tasks), 3) 1898 cleanup_task = tasks[0] 1899 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 1900 self.assertEquals(cleanup_task.host.id, 1) 1901 1902 1903 def _test_reboot_before_if_dirty_helper(self, expect_reboot): 1904 job = self._create_job(hosts=[1]) 1905 job.reboot_before = models.RebootBefore.IF_DIRTY 1906 job.save() 1907 1908 tasks = self._test_run_helper() 1909 self.assertEquals(len(tasks), expect_reboot and 3 or 2) 1910 if expect_reboot: 1911 cleanup_task = tasks[0] 1912 self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) 1913 self.assertEquals(cleanup_task.host.id, 1) 1914 1915 def test_reboot_before_if_dirty(self): 1916 models.Host.smart_get(1).update_object(dirty=True) 1917 self._test_reboot_before_if_dirty_helper(True) 1918 1919 1920 def test_reboot_before_not_dirty(self): 1921 models.Host.smart_get(1).update_object(dirty=False) 1922 self._test_reboot_before_if_dirty_helper(False) 1923 1924 1925 def test_next_group_name(self): 1926 django_job = self._create_job(metahosts=[1]) 1927 job = monitor_db.Job(id=django_job.id) 1928 self.assertEqual('group0', job._next_group_name()) 1929 1930 for hqe in django_job.hostqueueentry_set.filter(): 1931 hqe.execution_subdir = 'my_rack.group0' 1932 hqe.save() 1933 self.assertEqual('my_rack.group1', job._next_group_name('my/rack')) 1934 1935 1936class TopLevelFunctionsTest(unittest.TestCase): 1937 def test_autoserv_command_line(self): 1938 machines = 'abcd12,efgh34' 1939 results_dir = '/fake/path' 1940 extra_args = ['-Z', 'hello'] 1941 expected_command_line = [monitor_db._autoserv_path, '-p', 1942 '-m', machines, '-r', results_dir] 1943 1944 command_line = monitor_db._autoserv_command_line( 1945 machines, results_dir, extra_args) 1946 self.assertEqual(expected_command_line + extra_args, command_line) 1947 1948 class FakeJob(object): 1949 owner = 'Bob' 1950 name = 'fake job name' 1951 1952 command_line = monitor_db._autoserv_command_line( 1953 machines, results_dir, extra_args=[], job=FakeJob()) 1954 self.assertEqual(expected_command_line + 1955 ['-u', FakeJob.owner, '-l', FakeJob.name], 1956 command_line) 1957 1958 1959if __name__ == '__main__': 1960 unittest.main() 1961