1#!/usr/bin/python 2#pylint: disable-msg=C0111 3 4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Host scheduler. 9 10If run as a standalone service, the host scheduler ensures the following: 11 1. Hosts will not be assigned to multiple hqes simultaneously. The process 12 of assignment in this case refers to the modification of the host_id 13 column of a row in the afe_host_queue_entries table, to reflect the host 14 id of a leased host matching the dependencies of the job. 15 2. Hosts that are not being used by active hqes or incomplete special tasks 16 will be released back to the available hosts pool, for acquisition by 17 subsequent hqes. 18In addition to these guarantees, the host scheduler also confirms that no 2 19active hqes/special tasks are assigned the same host, and sets the leased bit 20for hosts needed by frontend special tasks. The need for the latter is only 21apparent when viewed in the context of the job-scheduler (monitor_db), which 22runs special tasks only after their hosts have been leased. 23 24** Suport minimum duts requirement for suites (non-inline mode) ** 25 26Each suite can specify the minimum number of duts it requires by 27dropping a 'suite_min_duts' job keyval which defaults to 0. 28 29When suites are competing for duts, if any suite has not got minimum duts 30it requires, the host scheduler will try to meet the requirement first, 31even if other suite may have higher priority or earlier timestamp. Once 32all suites' minimum duts requirement have been fullfilled, the host 33scheduler will allocate the rest of duts based on job priority and suite job id. 34This is to prevent low priority suites from starving when sharing pool with 35high-priority suites. 36 37Note: 38 1. Prevent potential starvation: 39 We need to carefully choose |suite_min_duts| for both low and high 40 priority suites. If a high priority suite didn't specify it but a low 41 priority one does, the high priority suite can be starved! 42 2. Restart requirement: 43 Restart host scheduler if you manually released a host by setting 44 leased=0 in db. This is needed because host scheduler maintains internal 45 state of host assignment for suites. 46 3. Exchanging duts triggers provisioning: 47 TODO(fdeng): There is a chance two suites can exchange duts, 48 if the two suites are for different builds, the exchange 49 will trigger provisioning. This can be optimized by preferring getting 50 hosts with the same build. 51""" 52 53import argparse 54import collections 55import datetime 56import logging 57import os 58import signal 59import sys 60import time 61 62import common 63from autotest_lib.client.common_lib import utils 64from autotest_lib.frontend import setup_django_environment 65 66# This import needs to come earlier to avoid using autotest's version of 67# httplib2, which is out of date. 68try: 69 from chromite.lib import metrics 70 from chromite.lib import ts_mon_config 71except ImportError: 72 metrics = utils.metrics_mock 73 ts_mon_config = utils.metrics_mock 74 75from autotest_lib.client.common_lib import global_config 76from autotest_lib.scheduler import email_manager 77from autotest_lib.scheduler import query_managers 78from autotest_lib.scheduler import rdb_lib 79from autotest_lib.scheduler import rdb_utils 80from autotest_lib.scheduler import scheduler_lib 81from autotest_lib.scheduler import scheduler_models 82from autotest_lib.site_utils import job_overhead 83from autotest_lib.site_utils import server_manager_utils 84 85 86_db_manager = None 87_shutdown = False 88_tick_pause_sec = global_config.global_config.get_config_value( 89 'SCHEDULER', 'tick_pause_sec', type=int, default=5) 90_monitor_db_host_acquisition = global_config.global_config.get_config_value( 91 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) 92_METRICS_PREFIX = 'chromeos/autotest/host_scheduler' 93 94class SuiteRecorder(object): 95 """Recording the host assignment for suites. 96 97 The recorder holds two things: 98 * suite_host_num, records how many duts a suite is holding, 99 which is a map <suite_job_id -> num_of_hosts> 100 * hosts_to_suites, records which host is assigned to which 101 suite, it is a map <host_id -> suite_job_id> 102 The two datastructure got updated when a host is assigned to or released 103 by a job. 104 105 The reason to maintain hosts_to_suites is that, when a host is released, 106 we need to know which suite it was leased to. Querying the db for the 107 latest completed job that has run on a host is slow. Therefore, we go with 108 an alternative: keeping a <host id, suite job id> map 109 in memory (for 10K hosts, the map should take less than 1M memory on 110 64-bit machine with python 2.7) 111 112 """ 113 114 115 def __init__(self, job_query_manager): 116 """Initialize. 117 118 @param job_queue_manager: A JobQueueryManager object. 119 """ 120 self.job_query_manager = job_query_manager 121 self.suite_host_num, self.hosts_to_suites = ( 122 self.job_query_manager.get_suite_host_assignment()) 123 124 125 def record_assignment(self, queue_entry): 126 """Record that the hqe has got a host. 127 128 @param queue_entry: A scheduler_models.HostQueueEntry object which has 129 got a host. 130 """ 131 parent_id = queue_entry.job.parent_job_id 132 if not parent_id: 133 return 134 if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: 135 logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' 136 'seems already recorded', queue_entry.id, 137 parent_id, queue_entry.host.hostname) 138 return 139 num_hosts = self.suite_host_num.get(parent_id, 0) 140 self.suite_host_num[parent_id] = num_hosts + 1 141 self.hosts_to_suites[queue_entry.host_id] = parent_id 142 logging.debug('Suite %d got host %s, currently holding %d hosts', 143 parent_id, queue_entry.host.hostname, 144 self.suite_host_num[parent_id]) 145 146 147 def record_release(self, hosts): 148 """Update the record with host releasing event. 149 150 @param hosts: A list of scheduler_models.Host objects. 151 """ 152 for host in hosts: 153 if host.id in self.hosts_to_suites: 154 parent_job_id = self.hosts_to_suites.pop(host.id) 155 count = self.suite_host_num[parent_job_id] - 1 156 if count == 0: 157 del self.suite_host_num[parent_job_id] 158 else: 159 self.suite_host_num[parent_job_id] = count 160 logging.debug( 161 'Suite %d releases host %s, currently holding %d hosts', 162 parent_job_id, host.hostname, count) 163 164 165 def get_min_duts(self, suite_job_ids): 166 """Figure out min duts to request. 167 168 Given a set ids of suite jobs, figure out minimum duts to request for 169 each suite. It is determined by two factors: min_duts specified 170 for each suite in its job keyvals, and how many duts a suite is 171 currently holding. 172 173 @param suite_job_ids: A set of suite job ids. 174 175 @returns: A dictionary, the key is suite_job_id, the value 176 is the minimum number of duts to request. 177 """ 178 suite_min_duts = self.job_query_manager.get_min_duts_of_suites( 179 suite_job_ids) 180 for parent_id in suite_job_ids: 181 min_duts = suite_min_duts.get(parent_id, 0) 182 cur_duts = self.suite_host_num.get(parent_id, 0) 183 suite_min_duts[parent_id] = max(0, min_duts - cur_duts) 184 logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', 185 suite_min_duts) 186 return suite_min_duts 187 188 189class BaseHostScheduler(object): 190 """Base class containing host acquisition logic. 191 192 This class contains all the core host acquisition logic needed by the 193 scheduler to run jobs on hosts. It is only capable of releasing hosts 194 back to the rdb through its tick, any other action must be instigated by 195 the job scheduler. 196 """ 197 198 199 host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) 200 201 202 def __init__(self): 203 self.host_query_manager = query_managers.AFEHostQueryManager() 204 205 206 def _release_hosts(self): 207 """Release hosts to the RDB. 208 209 Release all hosts that are ready and are currently not being used by an 210 active hqe, and don't have a new special task scheduled against them. 211 212 @return a list of hosts that are released. 213 """ 214 release_hosts = self.host_query_manager.find_unused_healty_hosts() 215 release_hostnames = [host.hostname for host in release_hosts] 216 if release_hostnames: 217 self.host_query_manager.set_leased( 218 False, hostname__in=release_hostnames) 219 return release_hosts 220 221 222 @classmethod 223 def schedule_host_job(cls, host, queue_entry): 224 """Schedule a job on a host. 225 226 Scheduling a job involves: 227 1. Setting the active bit on the queue_entry. 228 2. Scheduling a special task on behalf of the queue_entry. 229 Performing these actions will lead the job scheduler through a chain of 230 events, culminating in running the test and collecting results from 231 the host. 232 233 @param host: The host against which to schedule the job. 234 @param queue_entry: The queue_entry to schedule. 235 """ 236 if queue_entry.host_id is None: 237 queue_entry.set_host(host) 238 elif host.id != queue_entry.host_id: 239 raise rdb_utils.RDBException('The rdb returned host: %s ' 240 'but the job:%s was already assigned a host: %s. ' % 241 (host.hostname, queue_entry.job_id, 242 queue_entry.host.hostname)) 243 queue_entry.update_field('active', True) 244 245 # TODO: crbug.com/373936. The host scheduler should only be assigning 246 # jobs to hosts, but the criterion we use to release hosts depends 247 # on it not being used by an active hqe. Since we're activating the 248 # hqe here, we also need to schedule its first prejob task. OTOH, 249 # we could converge to having the host scheduler manager all special 250 # tasks, since their only use today is to verify/cleanup/reset a host. 251 logging.info('Scheduling pre job tasks for entry: %s', queue_entry) 252 queue_entry.schedule_pre_job_tasks() 253 254 255 def acquire_hosts(self, host_jobs): 256 """Accquire hosts for given jobs. 257 258 This method sends jobs that need hosts to rdb. 259 Child class can override this method to pipe more args 260 to rdb. 261 262 @param host_jobs: A list of queue entries that either require hosts, 263 or require host assignment validation through the rdb. 264 265 @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper 266 for each host acquired on behalf of a queue_entry, 267 or None if a host wasn't found. 268 """ 269 return rdb_lib.acquire_hosts(host_jobs) 270 271 272 def find_hosts_for_jobs(self, host_jobs): 273 """Find and verify hosts for a list of jobs. 274 275 @param host_jobs: A list of queue entries that either require hosts, 276 or require host assignment validation through the rdb. 277 @return: A generator of tuples of the form (host, queue_entry) for each 278 valid host-queue_entry assignment. 279 """ 280 hosts = self.acquire_hosts(host_jobs) 281 for host, job in zip(hosts, host_jobs): 282 if host: 283 yield self.host_assignment(host, job) 284 285 286 def tick(self): 287 """Schedule core host management activities.""" 288 self._release_hosts() 289 290 291class HostScheduler(BaseHostScheduler): 292 """A scheduler capable managing host acquisition for new jobs.""" 293 294 295 def __init__(self): 296 super(HostScheduler, self).__init__() 297 self.job_query_manager = query_managers.AFEJobQueryManager() 298 # Keeping track on how many hosts each suite is holding 299 # {suite_job_id: num_hosts} 300 self._suite_recorder = SuiteRecorder(self.job_query_manager) 301 302 303 def _record_host_assignment(self, host, queue_entry): 304 """Record that |host| is assigned to |queue_entry|. 305 306 Record: 307 1. How long it takes to assign a host to a job in metadata db. 308 2. Record host assignment of a suite. 309 310 @param host: A Host object. 311 @param queue_entry: A HostQueueEntry object. 312 """ 313 secs_in_queued = (datetime.datetime.now() - 314 queue_entry.job.created_on).total_seconds() 315 job_overhead.record_state_duration( 316 queue_entry.job_id, host.hostname, 317 job_overhead.STATUS.QUEUED, secs_in_queued) 318 self._suite_recorder.record_assignment(queue_entry) 319 320 321 @metrics.SecondsTimerDecorator( 322 '%s/schedule_jobs_duration' % _METRICS_PREFIX) 323 def _schedule_jobs(self): 324 """Schedule new jobs against hosts.""" 325 326 new_jobs_with_hosts = 0 327 queue_entries = self.job_query_manager.get_pending_queue_entries( 328 only_hostless=False) 329 unverified_host_jobs = [job for job in queue_entries 330 if not job.is_hostless()] 331 if unverified_host_jobs: 332 for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): 333 self.schedule_host_job(acquisition.host, acquisition.job) 334 self._record_host_assignment(acquisition.host, acquisition.job) 335 new_jobs_with_hosts += 1 336 metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX 337 ).increment_by(new_jobs_with_hosts) 338 339 num_jobs_without_hosts = (len(unverified_host_jobs) - 340 new_jobs_with_hosts) 341 metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX 342 ).set(num_jobs_without_hosts) 343 344 metrics.Counter('%s/tick' % _METRICS_PREFIX).increment() 345 346 @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX) 347 def _lease_hosts_of_frontend_tasks(self): 348 """Lease hosts of tasks scheduled through the frontend.""" 349 # We really don't need to get all the special tasks here, just the ones 350 # without hqes, but reusing the method used by the scheduler ensures 351 # we prioritize the same way. 352 lease_hostnames = [ 353 task.host.hostname for task in 354 self.job_query_manager.get_prioritized_special_tasks( 355 only_tasks_with_leased_hosts=False) 356 if task.queue_entry_id is None and not task.host.leased] 357 # Leasing a leased hosts here shouldn't be a problem: 358 # 1. The only way a host can be leased is if it's been assigned to 359 # an active hqe or another similar frontend task, but doing so will 360 # have already precluded it from the list of tasks returned by the 361 # job_query_manager. 362 # 2. The unleasing is done based on global conditions. Eg: Even if a 363 # task has already leased a host and we lease it again, the 364 # host scheduler won't release the host till both tasks are complete. 365 if lease_hostnames: 366 self.host_query_manager.set_leased( 367 True, hostname__in=lease_hostnames) 368 369 370 def acquire_hosts(self, host_jobs): 371 """Override acquire_hosts. 372 373 This method overrides the method in parent class. 374 It figures out a set of suites that |host_jobs| belong to; 375 and get min_duts requirement for each suite. 376 It pipes min_duts for each suite to rdb. 377 378 """ 379 parent_job_ids = set([q.job.parent_job_id 380 for q in host_jobs if q.job.parent_job_id]) 381 suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) 382 return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) 383 384 385 @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX) 386 def tick(self): 387 logging.info('Calling new tick.') 388 logging.info('Leasing hosts for frontend tasks.') 389 self._lease_hosts_of_frontend_tasks() 390 logging.info('Finding hosts for new jobs.') 391 self._schedule_jobs() 392 logging.info('Releasing unused hosts.') 393 released_hosts = self._release_hosts() 394 logging.info('Updating suite assignment with released hosts') 395 self._suite_recorder.record_release(released_hosts) 396 logging.info('Calling email_manager.') 397 email_manager.manager.send_queued_emails() 398 399 400class DummyHostScheduler(BaseHostScheduler): 401 """A dummy host scheduler that doesn't acquire or release hosts.""" 402 403 def __init__(self): 404 pass 405 406 407 def tick(self): 408 pass 409 410 411def handle_signal(signum, frame): 412 """Sigint handler so we don't crash mid-tick.""" 413 global _shutdown 414 _shutdown = True 415 logging.info("Shutdown request received.") 416 417 418def initialize(testing=False): 419 """Initialize the host scheduler.""" 420 if testing: 421 # Don't import testing utilities unless we're in testing mode, 422 # as the database imports have side effects. 423 from autotest_lib.scheduler import rdb_testing_utils 424 rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( 425 db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) 426 global _db_manager 427 _db_manager = scheduler_lib.ConnectionManager() 428 scheduler_lib.setup_logging( 429 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 430 None, timestamped_logfile_prefix='host_scheduler') 431 logging.info("Setting signal handler") 432 signal.signal(signal.SIGINT, handle_signal) 433 signal.signal(signal.SIGTERM, handle_signal) 434 scheduler_models.initialize() 435 436 437def parse_arguments(argv): 438 """ 439 Parse command line arguments 440 441 @param argv: argument list to parse 442 @returns: parsed arguments. 443 """ 444 parser = argparse.ArgumentParser(description='Host scheduler.') 445 parser.add_argument('--testing', action='store_true', default=False, 446 help='Start the host scheduler in testing mode.') 447 parser.add_argument('--production', 448 help=('Indicate that scheduler is running in production' 449 ' environment and it can use database that is not' 450 ' hosted in localhost. If it is set to False, ' 451 'scheduler will fail if database is not in ' 452 'localhost.'), 453 action='store_true', default=False) 454 parser.add_argument( 455 '--lifetime-hours', 456 type=float, 457 default=None, 458 help='If provided, number of hours the scheduler should run for. ' 459 'At the expiry of this time, the process will exit ' 460 'gracefully.', 461 ) 462 parser.add_argument( 463 '--metrics-file', 464 help='If provided, drop metrics to this local file instead of ' 465 'reporting to ts_mon', 466 type=str, 467 default=None, 468 ) 469 options = parser.parse_args(argv) 470 471 return options 472 473 474def main(): 475 if _monitor_db_host_acquisition: 476 logging.info('Please set inline_host_acquisition=False in the shadow ' 477 'config before starting the host scheduler.') 478 sys.exit(0) 479 try: 480 options = parse_arguments(sys.argv[1:]) 481 scheduler_lib.check_production_settings(options) 482 483 # If server database is enabled, check if the server has role 484 # `host_scheduler`. If the server does not have host_scheduler role, 485 # exception will be raised and host scheduler will not continue to run. 486 if server_manager_utils.use_server_db(): 487 server_manager_utils.confirm_server_has_role(hostname='localhost', 488 role='host_scheduler') 489 490 initialize(options.testing) 491 492 with ts_mon_config.SetupTsMonGlobalState( 493 'autotest_host_scheduler', 494 indirect=True, 495 debug_file=options.metrics_file, 496 ): 497 metrics.Counter('%s/start' % _METRICS_PREFIX).increment() 498 process_start_time = time.time() 499 host_scheduler = HostScheduler() 500 minimum_tick_sec = global_config.global_config.get_config_value( 501 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float) 502 while not _shutdown: 503 if _lifetime_expired(options.lifetime_hours, 504 process_start_time): 505 break 506 start = time.time() 507 host_scheduler.tick() 508 curr_tick_sec = time.time() - start 509 if (minimum_tick_sec > curr_tick_sec): 510 time.sleep(minimum_tick_sec - curr_tick_sec) 511 else: 512 time.sleep(0.0001) 513 logging.info('Shutdown request recieved. Bye! Bye!') 514 except server_manager_utils.ServerActionError: 515 # This error is expected when the server is not in primary status 516 # for host-scheduler role. Thus do not send email for it. 517 raise 518 except Exception: 519 metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment() 520 raise 521 finally: 522 email_manager.manager.send_queued_emails() 523 if _db_manager: 524 _db_manager.disconnect() 525 526 527def _lifetime_expired(lifetime_hours, process_start_time): 528 """Returns True if we've expired the process lifetime, False otherwise. 529 530 Also sets the global _shutdown so that any background processes also take 531 the cue to exit. 532 """ 533 if lifetime_hours is None: 534 return False 535 if time.time() - process_start_time > lifetime_hours * 3600: 536 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', 537 lifetime_hours) 538 global _shutdown 539 _shutdown = True 540 return True 541 return False 542 543 544if __name__ == '__main__': 545 main() 546