1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8import mock 9 10import common 11 12from autotest_lib.client.common_lib.test_utils import unittest 13from autotest_lib.frontend import setup_django_environment 14from autotest_lib.frontend.afe import frontend_test_utils 15from autotest_lib.frontend.afe import models 16from autotest_lib.server.cros.dynamic_suite import constants 17from autotest_lib.scheduler import host_scheduler 18from autotest_lib.scheduler import monitor_db 19from autotest_lib.scheduler import rdb 20from autotest_lib.scheduler import rdb_lib 21from autotest_lib.scheduler import rdb_testing_utils 22from autotest_lib.scheduler import scheduler_models 23 24 25class QueryManagerTests(rdb_testing_utils.AbstractBaseRDBTester, 26 unittest.TestCase): 27 """Verify scheduler behavior when pending jobs are already given hosts.""" 28 29 _config_section = 'AUTOTEST_WEB' 30 31 32 def testPendingQueueEntries(self): 33 """Test retrieval of pending queue entries.""" 34 job = self.create_job(deps=set(['a'])) 35 36 # Check that we don't pull the job we just created with only_hostless. 37 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries( 38 only_hostless=True) 39 self.assertTrue(len(jobs_with_hosts) == 0) 40 41 # Check that only_hostless=False pulls new jobs, as always. 42 jobs_without_hosts = self.job_query_manager.get_pending_queue_entries( 43 only_hostless=False) 44 self.assertTrue(jobs_without_hosts[0].id == job.id and 45 jobs_without_hosts[0].host_id is None) 46 47 48 def testPendingQueueEntriesForShard(self): 49 """Test queue entries for shards aren't executed by master scheduler""" 50 job1 = self.create_job(deps=set(['a'])) 51 job2 = self.create_job(deps=set(['b'])) 52 shard = models.Shard.objects.create() 53 # Assign the job's label to a shard 54 shard.labels.add(job1.dependency_labels.all()[0]) 55 56 # Check that we only pull jobs which are not assigned to a shard. 57 jobs_with_hosts = self.job_query_manager.get_pending_queue_entries() 58 self.assertTrue(len(jobs_with_hosts) == 1) 59 self.assertEqual(jobs_with_hosts[0].id, job2.id) 60 61 62 def testHostQueries(self): 63 """Verify that the host query manager maintains its data structures.""" 64 # Create a job and use the host_query_managers internal datastructures 65 # to retrieve its job info. 66 job = self.create_job( 67 deps=rdb_testing_utils.DEFAULT_DEPS, 68 acls=rdb_testing_utils.DEFAULT_ACLS) 69 queue_entries = self._dispatcher._refresh_pending_queue_entries() 70 job_manager = rdb_lib.JobQueryManager(queue_entries) 71 job_info = job_manager.get_job_info(queue_entries[0]) 72 default_dep_ids = set([label.id for label in self.db_helper.get_labels( 73 name__in=rdb_testing_utils.DEFAULT_DEPS)]) 74 default_acl_ids = set([acl.id for acl in self.db_helper.get_acls( 75 name__in=rdb_testing_utils.DEFAULT_ACLS)]) 76 self.assertTrue(set(job_info['deps']) == default_dep_ids) 77 self.assertTrue(set(job_info['acls']) == default_acl_ids) 78 79 80 def testNewJobsWithHosts(self): 81 """Test that we handle inactive hqes with unleased hosts correctly.""" 82 # Create a job and assign it an unleased host, then check that the 83 # HQE becomes active and the host remains assigned to it. 84 job = self.create_job(deps=['a']) 85 host = self.db_helper.create_host('h1', deps=['a']) 86 self.db_helper.add_host_to_job(host, job.id) 87 88 queue_entries = self._dispatcher._refresh_pending_queue_entries() 89 self._dispatcher._schedule_new_jobs() 90 91 host = self.db_helper.get_host(hostname='h1')[0] 92 self.assertTrue(host.leased == True and 93 host.status == models.Host.Status.READY) 94 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 95 self.assertTrue(len(hqes) == 1 and hqes[0].active and 96 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 97 98 99 def testNewJobsWithInvalidHost(self): 100 """Test handling of inactive hqes assigned invalid, unleased hosts.""" 101 # Create a job and assign it an unleased host, then check that the 102 # HQE becomes DOES NOT become active, because we validate the 103 # assignment again. 104 job = self.create_job(deps=['a']) 105 host = self.db_helper.create_host('h1', deps=['b']) 106 self.db_helper.add_host_to_job(host, job.id) 107 108 queue_entries = self._dispatcher._refresh_pending_queue_entries() 109 self._dispatcher._schedule_new_jobs() 110 111 host = self.db_helper.get_host(hostname='h1')[0] 112 self.assertTrue(host.leased == False and 113 host.status == models.Host.Status.READY) 114 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 115 self.assertTrue(len(hqes) == 1 and not hqes[0].active and 116 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 117 118 119 def testNewJobsWithLeasedHost(self): 120 """Test handling of inactive hqes assigned leased hosts.""" 121 # Create a job and assign it a leased host, then check that the 122 # HQE does not become active through the scheduler, and that the 123 # host gets released. 124 job = self.create_job(deps=['a']) 125 host = self.db_helper.create_host('h1', deps=['b']) 126 self.db_helper.add_host_to_job(host, job.id) 127 host.leased = 1 128 host.save() 129 130 rdb.batch_acquire_hosts = mock.MagicMock() 131 queue_entries = self._dispatcher._refresh_pending_queue_entries() 132 self._dispatcher._schedule_new_jobs() 133 self.assertTrue(rdb.batch_acquire_hosts.call_count == 0) 134 host = self.db_helper.get_host(hostname='h1')[0] 135 self.assertTrue(host.leased == True and 136 host.status == models.Host.Status.READY) 137 hqes = list(self.db_helper.get_hqes(host_id=host.id)) 138 self.assertTrue(len(hqes) == 1 and not hqes[0].active and 139 hqes[0].status == models.HostQueueEntry.Status.QUEUED) 140 self.host_scheduler._release_hosts() 141 self.assertTrue(self.db_helper.get_host(hostname='h1')[0].leased == 0) 142 143 144 def testSpecialTaskOrdering(self): 145 """Test priority ordering of special tasks.""" 146 147 # Create 2 special tasks, one with and one without an hqe. 148 # Activate the hqe and make sure it gets scheduled before the other. 149 host = self.db_helper.create_host('h1', deps=['a']) 150 job1 = self.create_job(deps=['a']) 151 self.db_helper.add_host_to_job(host, job1.id) 152 task1 = self.db_helper.create_special_task(job1.id) 153 hqe = self.db_helper.get_hqes(job=job1.id)[0] 154 155 # This task has no queue entry. 156 task2 = self.db_helper.create_special_task(host_id=host.id) 157 158 # Since the hqe task isn't active we get both back. 159 tasks = self.job_query_manager.get_prioritized_special_tasks() 160 self.assertTrue(tasks[1].queue_entry_id is None and 161 tasks[0].queue_entry_id == hqe.id) 162 163 # Activate the hqe and make sure the frontned task isn't returned. 164 self.db_helper.update_hqe(hqe.id, active=True) 165 tasks = self.job_query_manager.get_prioritized_special_tasks() 166 self.assertTrue(tasks[0].id == task1.id) 167 168 169class HostSchedulerTests(rdb_testing_utils.AbstractBaseRDBTester, 170 unittest.TestCase): 171 """Verify scheduler behavior when pending jobs are already given hosts.""" 172 173 _config_section = 'AUTOTEST_WEB' 174 175 176 def setUp(self): 177 super(HostSchedulerTests, self).setUp() 178 self.host_scheduler = host_scheduler.HostScheduler() 179 180 181 def testSpecialTaskLocking(self): 182 """Test that frontend special tasks lock hosts.""" 183 # Create multiple tasks with hosts and make sure the hosts get locked. 184 host = self.db_helper.create_host('h') 185 host1 = self.db_helper.create_host('h1') 186 task = self.db_helper.create_special_task(host_id=host.id) 187 task1 = self.db_helper.create_special_task(host_id=host1.id) 188 self.host_scheduler._lease_hosts_of_frontend_tasks() 189 self.assertTrue(self.db_helper.get_host(hostname='h')[0].leased == 1 and 190 self.db_helper.get_host(hostname='h1')[0].leased == 1) 191 192 193 def testJobScheduling(self): 194 """Test new host acquisitions.""" 195 # Create a job that will find a host through the host scheduler, and 196 # make sure the hqe is activated, and a special task is created. 197 job = self.create_job(deps=set(['a'])) 198 host = self.db_helper.create_host('h1', deps=set(['a'])) 199 self.host_scheduler._schedule_jobs() 200 hqe = self.db_helper.get_hqes(job_id=job.id)[0] 201 self.assertTrue(hqe.active and hqe.host_id == host.id and 202 hqe.status == models.HostQueueEntry.Status.QUEUED) 203 task = self.db_helper.get_tasks(queue_entry_id=hqe.id)[0] 204 self.assertTrue(task.is_active == 0 and task.host_id == host.id) 205 206 207 def _check_agent_invariants(self, host, agent): 208 host_agents = list(self._dispatcher._host_agents[host.id]) 209 self.assertTrue(len(host_agents) == 1) 210 self.assertTrue(host_agents[0].task.task.id == agent.id) 211 return host_agents[0] 212 213 214 def testLeasedFrontendTaskHost(self): 215 """Check that we don't scheduler a special task on an unleased host.""" 216 # Create a special task without an hqe and make sure it isn't returned 217 # for scheduling till its host is leased. 218 host = self.db_helper.create_host('h1', deps=['a']) 219 task = self.db_helper.create_special_task(host_id=host.id) 220 221 tasks = self.job_query_manager.get_prioritized_special_tasks( 222 only_tasks_with_leased_hosts=True) 223 self.assertTrue(tasks == []) 224 tasks = self.job_query_manager.get_prioritized_special_tasks( 225 only_tasks_with_leased_hosts=False) 226 self.assertTrue(tasks[0].id == task.id) 227 self.host_scheduler._lease_hosts_of_frontend_tasks() 228 tasks = self.job_query_manager.get_prioritized_special_tasks( 229 only_tasks_with_leased_hosts=True) 230 self.assertTrue(tasks[0].id == task.id) 231 232 233 def testTickLockStep(self): 234 """Check that a frontend task and an hqe never run simultaneously.""" 235 236 self.god.stub_with(monitor_db, '_inline_host_acquisition', False) 237 238 # Create a frontend special task against a host. 239 host = self.db_helper.create_host('h1', deps=set(['a'])) 240 frontend_task = self.db_helper.create_special_task(host_id=host.id) 241 self._dispatcher._schedule_special_tasks() 242 # The frontend special task shouldn't get scheduled on the host till 243 # the host is leased. 244 self.assertFalse(self._dispatcher.host_has_agent(host)) 245 246 # Create a job for the same host and make the host scheduler lease the 247 # host out to that job. 248 job = self.create_job(deps=set(['a'])) 249 self.host_scheduler._schedule_jobs() 250 hqe = self.db_helper.get_hqes(job_id=job.id)[0] 251 tasks = self.job_query_manager.get_prioritized_special_tasks( 252 only_tasks_with_leased_hosts=True) 253 # We should not find the frontend special task, even though its host is 254 # now leased, because its leased by an active hqe. 255 self.assertTrue(len(tasks) == 1 and tasks[0].queue_entry_id == hqe.id) 256 self._dispatcher._schedule_special_tasks() 257 self.assertTrue(self._dispatcher.host_has_agent(host)) 258 259 # Deactivate the hqe task and make sure the frontend task gets the host. 260 task = tasks[0] 261 self._dispatcher.remove_agent(self._check_agent_invariants(host, task)) 262 task.is_complete = 1 263 task.is_active = 0 264 task.save() 265 self.db_helper.update_hqe(hqe.id, active=False) 266 self._dispatcher._schedule_special_tasks() 267 self.assertTrue(self._dispatcher.host_has_agent(host)) 268 self._check_agent_invariants(host, frontend_task) 269 270 # Make sure we don't release the host being used by the incomplete task. 271 self.host_scheduler._release_hosts() 272 host = self.db_helper.get_host(hostname='h1')[0] 273 self.assertTrue(host.leased == True) 274 275 276class SuiteRecorderTest(rdb_testing_utils.AbstractBaseRDBTester, 277 unittest.TestCase): 278 """Test the functionality of SuiteRecorder""" 279 280 _config_section = 'AUTOTEST_WEB' 281 282 def testGetSuiteHostAssignment(self): 283 """Test the initialization of SuiteRecord.""" 284 hosts = [] 285 num = 4 286 for i in range (0, num): 287 hosts.append(self.db_helper.create_host( 288 'h%d' % i, deps=set(['board:lumpy']))) 289 single_job = self.create_job(deps=set(['a'])) 290 jobs_1 = self.create_suite(num=2, board='board:lumpy') 291 jobs_2 = self.create_suite(num=2, board='board:lumpy') 292 # We have 4 hosts, 5 jobs, one job in the second suite won't 293 # get a host. 294 all_jobs = ([single_job] + 295 [jobs_1[k] for k in jobs_1 if k !='parent_job'] + 296 [jobs_2[k] for k in jobs_2 if k !='parent_job']) 297 for i in range(0, num): 298 self.db_helper.add_host_to_job(hosts[i], all_jobs[i].id, 299 activate=True) 300 r = host_scheduler.SuiteRecorder(self.job_query_manager) 301 self.assertEqual(r.suite_host_num, 302 {jobs_1['parent_job'].id:2, 303 jobs_2['parent_job'].id:1}) 304 self.assertEqual(r.hosts_to_suites, 305 {hosts[1].id: jobs_1['parent_job'].id, 306 hosts[2].id: jobs_1['parent_job'].id, 307 hosts[3].id: jobs_2['parent_job'].id}) 308 309 310 def verify_state(self, recorder, suite_host_num, hosts_to_suites): 311 """Verify the suite, host information held by SuiteRecorder. 312 313 @param recorder: A SuiteRecorder object. 314 @param suite_host_num: a dict, expected value of suite_host_num. 315 @param hosts_to_suites: a dict, expected value of hosts_to_suites. 316 """ 317 self.assertEqual(recorder.suite_host_num, suite_host_num) 318 self.assertEqual(recorder.hosts_to_suites, hosts_to_suites) 319 320 321 def assign_host_to_job(self, host, job, recorder=None): 322 """A helper function that adds a host to a job and record it. 323 324 @param host: A Host object. 325 @param job: A Job object. 326 @param recorder: A SuiteRecorder object to record the assignment. 327 328 @return a HostQueueEntry object that binds the host and job together. 329 """ 330 self.db_helper.add_host_to_job(host, job) 331 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s', 332 params=(job.id,))[0] 333 if recorder: 334 recorder.record_assignment(hqe) 335 return hqe 336 337 338 def testRecordAssignmentAndRelease(self): 339 """Test when a host is assigned to suite""" 340 r = host_scheduler.SuiteRecorder(self.job_query_manager) 341 self.verify_state(r, {}, {}) 342 host1 = self.db_helper.create_host('h1') 343 host2 = self.db_helper.create_host('h2') 344 jobs = self.create_suite(num=2) 345 hqe = scheduler_models.HostQueueEntry.fetch(where='job_id=%s', 346 params=(jobs[0].id,))[0] 347 # HQE got a host. 348 hqe = self.assign_host_to_job(host1, jobs[0], r) 349 self.verify_state(r, {jobs['parent_job'].id:1}, 350 {host1.id: jobs['parent_job'].id}) 351 # Tried to call record_assignment again, nothing should happen. 352 r.record_assignment(hqe) 353 self.verify_state(r, {jobs['parent_job'].id:1}, 354 {host1.id: jobs['parent_job'].id}) 355 # Second hqe got a host 356 self.assign_host_to_job(host2, jobs[1], r) 357 self.verify_state(r, {jobs['parent_job'].id:2}, 358 {host1.id: jobs['parent_job'].id, 359 host2.id: jobs['parent_job'].id}) 360 # Release host1 361 r.record_release([host1]) 362 self.verify_state(r, {jobs['parent_job'].id:1}, 363 {host2.id: jobs['parent_job'].id}) 364 # Release host2 365 r.record_release([host2]) 366 self.verify_state(r, {}, {}) 367 368 369 def testGetMinDuts(self): 370 """Test get min dut for suite.""" 371 host1 = self.db_helper.create_host('h1') 372 host2 = self.db_helper.create_host('h2') 373 host3 = self.db_helper.create_host('h3') 374 jobs = self.create_suite(num=3) 375 pid = jobs['parent_job'].id 376 # Set min_dut=1 for the suite as a job keyval. 377 keyval = models.JobKeyval( 378 job_id=pid, key=constants.SUITE_MIN_DUTS_KEY, value=2) 379 keyval.save() 380 r = host_scheduler.SuiteRecorder(self.job_query_manager) 381 # Not job has got any host, min dut to request should equal to what's 382 # specified in the job keyval. 383 self.assertEqual(r.get_min_duts([pid]), {pid: 2}) 384 self.assign_host_to_job(host1, jobs[0], r) 385 self.assertEqual(r.get_min_duts([pid]), {pid: 1}) 386 self.assign_host_to_job(host2, jobs[1], r) 387 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 388 self.assign_host_to_job(host3, jobs[2], r) 389 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 390 r.record_release([host1]) 391 self.assertEqual(r.get_min_duts([pid]), {pid: 0}) 392 r.record_release([host2]) 393 self.assertEqual(r.get_min_duts([pid]), {pid: 1}) 394 r.record_release([host3]) 395 self.assertEqual(r.get_min_duts([pid]), {pid: 2}) 396 397if __name__ == '__main__': 398 unittest.main() 399 400