1#!/usr/bin/python 2 3#pylint: disable=C0111 4 5""" 6Autotest scheduler 7""" 8 9import datetime 10import functools 11import gc 12import logging 13import optparse 14import os 15import signal 16import sys 17import time 18 19import common 20from autotest_lib.frontend import setup_django_environment 21 22import django.db 23 24from autotest_lib.client.common_lib import control_data 25from autotest_lib.client.common_lib import global_config 26from autotest_lib.client.common_lib import utils 27from autotest_lib.frontend.afe import models 28from autotest_lib.scheduler import agent_task, drone_manager 29from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler 30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task 31from autotest_lib.scheduler import postjob_task 32from autotest_lib.scheduler import query_managers 33from autotest_lib.scheduler import scheduler_lib 34from autotest_lib.scheduler import scheduler_models 35from autotest_lib.scheduler import status_server, scheduler_config 36from autotest_lib.server import autoserv_utils 37from autotest_lib.server import system_utils 38from autotest_lib.server import utils as server_utils 39from autotest_lib.site_utils import metadata_reporter 40from autotest_lib.site_utils import server_manager_utils 41 42try: 43 from chromite.lib import metrics 44 from chromite.lib import ts_mon_config 45except ImportError: 46 metrics = utils.metrics_mock 47 ts_mon_config = utils.metrics_mock 48 49 50BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' 51PID_FILE_PREFIX = 'monitor_db' 52 53RESULTS_DIR = '.' 54AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..') 55 56if os.environ.has_key('AUTOTEST_DIR'): 57 AUTOTEST_PATH = os.environ['AUTOTEST_DIR'] 58AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server') 59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko') 60 61if AUTOTEST_SERVER_DIR not in sys.path: 62 sys.path.insert(0, AUTOTEST_SERVER_DIR) 63 64# error message to leave in results dir when an autoserv process disappears 65# mysteriously 66_LOST_PROCESS_ERROR = """\ 67Autoserv failed abnormally during execution for this job, probably due to a 68system error on the Autotest server. Full results may not be available. Sorry. 69""" 70 71_db_manager = None 72_db = None 73_shutdown = False 74 75# These 2 globals are replaced for testing 76_autoserv_directory = autoserv_utils.autoserv_directory 77_autoserv_path = autoserv_utils.autoserv_path 78_testing_mode = False 79_drone_manager = None 80 81 82def _site_init_monitor_db_dummy(): 83 return {} 84 85 86def _verify_default_drone_set_exists(): 87 if (models.DroneSet.drone_sets_enabled() and 88 not models.DroneSet.default_drone_set_name()): 89 raise scheduler_lib.SchedulerError( 90 'Drone sets are enabled, but no default is set') 91 92 93def _sanity_check(): 94 """Make sure the configs are consistent before starting the scheduler""" 95 _verify_default_drone_set_exists() 96 97 98def main(): 99 try: 100 try: 101 main_without_exception_handling() 102 except SystemExit: 103 raise 104 except: 105 logging.exception('Exception escaping in monitor_db') 106 raise 107 finally: 108 utils.delete_pid_file_if_exists(PID_FILE_PREFIX) 109 110 111def main_without_exception_handling(): 112 scheduler_lib.setup_logging( 113 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), 114 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)) 115 usage = 'usage: %prog [options] results_dir' 116 parser = optparse.OptionParser(usage) 117 parser.add_option('--recover-hosts', help='Try to recover dead hosts', 118 action='store_true') 119 parser.add_option('--test', help='Indicate that scheduler is under ' + 120 'test and should use dummy autoserv and no parsing', 121 action='store_true') 122 parser.add_option('--production', 123 help=('Indicate that scheduler is running in production ' 124 'environment and it can use database that is not ' 125 'hosted in localhost. If it is set to False, ' 126 'scheduler will fail if database is not in ' 127 'localhost.'), 128 action='store_true', default=False) 129 (options, args) = parser.parse_args() 130 if len(args) != 1: 131 parser.print_usage() 132 return 133 134 scheduler_lib.check_production_settings(options) 135 136 scheduler_enabled = global_config.global_config.get_config_value( 137 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool) 138 139 if not scheduler_enabled: 140 logging.error("Scheduler not enabled, set enable_scheduler to true in " 141 "the global_config's SCHEDULER section to enable it. " 142 "Exiting.") 143 sys.exit(1) 144 145 global RESULTS_DIR 146 RESULTS_DIR = args[0] 147 148 site_init = utils.import_site_function(__file__, 149 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db", 150 _site_init_monitor_db_dummy) 151 site_init() 152 153 # Change the cwd while running to avoid issues incase we were launched from 154 # somewhere odd (such as a random NFS home directory of the person running 155 # sudo to launch us as the appropriate user). 156 os.chdir(RESULTS_DIR) 157 158 # This is helpful for debugging why stuff a scheduler launches is 159 # misbehaving. 160 logging.info('os.environ: %s', os.environ) 161 162 if options.test: 163 global _autoserv_path 164 _autoserv_path = 'autoserv_dummy' 165 global _testing_mode 166 _testing_mode = True 167 168 server = status_server.StatusServer() 169 server.start() 170 171 # Start the thread to report metadata. 172 metadata_reporter.start() 173 174 with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler', 175 indirect=True): 176 try: 177 initialize() 178 dispatcher = Dispatcher() 179 dispatcher.initialize(recover_hosts=options.recover_hosts) 180 minimum_tick_sec = global_config.global_config.get_config_value( 181 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float) 182 183 while not _shutdown and not server._shutdown_scheduler: 184 start = time.time() 185 dispatcher.tick() 186 curr_tick_sec = time.time() - start 187 if minimum_tick_sec > curr_tick_sec: 188 time.sleep(minimum_tick_sec - curr_tick_sec) 189 else: 190 time.sleep(0.0001) 191 except server_manager_utils.ServerActionError as e: 192 # This error is expected when the server is not in primary status 193 # for scheduler role. Thus do not send email for it. 194 logging.exception(e) 195 except Exception: 196 email_manager.manager.log_stacktrace( 197 "Uncaught exception; terminating monitor_db") 198 199 metadata_reporter.abort() 200 email_manager.manager.send_queued_emails() 201 server.shutdown() 202 _drone_manager.shutdown() 203 _db_manager.disconnect() 204 205 206def handle_signal(signum, frame): 207 global _shutdown 208 _shutdown = True 209 logging.info("Shutdown request received.") 210 211 212def initialize(): 213 logging.info("%s> dispatcher starting", time.strftime("%X %x")) 214 logging.info("My PID is %d", os.getpid()) 215 216 if utils.program_is_alive(PID_FILE_PREFIX): 217 logging.critical("monitor_db already running, aborting!") 218 sys.exit(1) 219 utils.write_pid(PID_FILE_PREFIX) 220 221 if _testing_mode: 222 global_config.global_config.override_config_value( 223 scheduler_lib.DB_CONFIG_SECTION, 'database', 224 'stresstest_autotest_web') 225 226 # If server database is enabled, check if the server has role `scheduler`. 227 # If the server does not have scheduler role, exception will be raised and 228 # scheduler will not continue to run. 229 if server_manager_utils.use_server_db(): 230 server_manager_utils.confirm_server_has_role(hostname='localhost', 231 role='scheduler') 232 233 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH'] 234 global _db_manager 235 _db_manager = scheduler_lib.ConnectionManager() 236 global _db 237 _db = _db_manager.get_connection() 238 logging.info("Setting signal handler") 239 signal.signal(signal.SIGINT, handle_signal) 240 signal.signal(signal.SIGTERM, handle_signal) 241 242 initialize_globals() 243 scheduler_models.initialize() 244 245 drone_list = system_utils.get_drones() 246 results_host = global_config.global_config.get_config_value( 247 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost') 248 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host) 249 250 logging.info("Connected! Running...") 251 252 253def initialize_globals(): 254 global _drone_manager 255 _drone_manager = drone_manager.instance() 256 257 258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None, 259 verbose=True): 260 """ 261 @returns The autoserv command line as a list of executable + parameters. 262 263 @param machines - string - A machine or comma separated list of machines 264 for the (-m) flag. 265 @param extra_args - list - Additional arguments to pass to autoserv. 266 @param job - Job object - If supplied, -u owner, -l name, --test-retry, 267 and client -c or server -s parameters will be added. 268 @param queue_entry - A HostQueueEntry object - If supplied and no Job 269 object was supplied, this will be used to lookup the Job object. 270 """ 271 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory, 272 machines, results_directory=drone_manager.WORKING_DIRECTORY, 273 extra_args=extra_args, job=job, queue_entry=queue_entry, 274 verbose=verbose, in_lab=True) 275 return command 276 277def _calls_log_tick_msg(func): 278 """Used to trace functions called by BaseDispatcher.tick.""" 279 @functools.wraps(func) 280 def wrapper(self, *args, **kwargs): 281 self._log_tick_msg('Starting %s' % func.__name__) 282 return func(self, *args, **kwargs) 283 284 return wrapper 285 286 287class BaseDispatcher(object): 288 289 290 def __init__(self): 291 self._agents = [] 292 self._last_clean_time = time.time() 293 user_cleanup_time = scheduler_config.config.clean_interval_minutes 294 self._periodic_cleanup = monitor_db_cleanup.UserCleanup( 295 _db, user_cleanup_time) 296 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep( 297 _db, _drone_manager) 298 self._host_agents = {} 299 self._queue_entry_agents = {} 300 self._tick_count = 0 301 self._last_garbage_stats_time = time.time() 302 self._seconds_between_garbage_stats = 60 * ( 303 global_config.global_config.get_config_value( 304 scheduler_config.CONFIG_SECTION, 305 'gc_stats_interval_mins', type=int, default=6*60)) 306 self._tick_debug = global_config.global_config.get_config_value( 307 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool, 308 default=False) 309 self._extra_debugging = global_config.global_config.get_config_value( 310 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool, 311 default=False) 312 self._inline_host_acquisition = ( 313 global_config.global_config.get_config_value( 314 scheduler_config.CONFIG_SECTION, 315 'inline_host_acquisition', type=bool, default=True)) 316 317 # If _inline_host_acquisition is set the scheduler will acquire and 318 # release hosts against jobs inline, with the tick. Otherwise the 319 # scheduler will only focus on jobs that already have hosts, and 320 # will not explicitly unlease a host when a job finishes using it. 321 self._job_query_manager = query_managers.AFEJobQueryManager() 322 self._host_scheduler = (host_scheduler.BaseHostScheduler() 323 if self._inline_host_acquisition else 324 host_scheduler.DummyHostScheduler()) 325 326 327 def initialize(self, recover_hosts=True): 328 self._periodic_cleanup.initialize() 329 self._24hr_upkeep.initialize() 330 # Execute all actions queued in the cleanup tasks. Scheduler tick will 331 # run a refresh task first. If there is any action in the queue, refresh 332 # will raise an exception. 333 _drone_manager.execute_actions() 334 335 # always recover processes 336 self._recover_processes() 337 338 if recover_hosts: 339 self._recover_hosts() 340 341 342 # TODO(pprabhu) Drop this metric once tick_times has been verified. 343 @metrics.SecondsTimerDecorator( 344 'chromeos/autotest/scheduler/tick_durations/tick') 345 def tick(self): 346 """ 347 This is an altered version of tick() where we keep track of when each 348 major step begins so we can try to figure out where we are using most 349 of the tick time. 350 """ 351 with metrics.RuntimeBreakdownTimer( 352 'chromeos/autotest/scheduler/tick_times') as breakdown_timer: 353 self._log_tick_msg('New tick') 354 system_utils.DroneCache.refresh() 355 356 with breakdown_timer.Step('garbage_collection'): 357 self._garbage_collection() 358 with breakdown_timer.Step('trigger_refresh'): 359 self._log_tick_msg('Starting _drone_manager.trigger_refresh') 360 _drone_manager.trigger_refresh() 361 with breakdown_timer.Step('schedule_running_host_queue_entries'): 362 self._schedule_running_host_queue_entries() 363 with breakdown_timer.Step('schedule_special_tasks'): 364 self._schedule_special_tasks() 365 with breakdown_timer.Step('schedule_new_jobs'): 366 self._schedule_new_jobs() 367 with breakdown_timer.Step('sync_refresh'): 368 self._log_tick_msg('Starting _drone_manager.sync_refresh') 369 _drone_manager.sync_refresh() 370 # _run_cleanup must be called between drone_manager.sync_refresh, 371 # and drone_manager.execute_actions, as sync_refresh will clear the 372 # calls queued in drones. Therefore, any action that calls 373 # drone.queue_call to add calls to the drone._calls, should be after 374 # drone refresh is completed and before 375 # drone_manager.execute_actions at the end of the tick. 376 with breakdown_timer.Step('run_cleanup'): 377 self._run_cleanup() 378 with breakdown_timer.Step('find_aborting'): 379 self._find_aborting() 380 with breakdown_timer.Step('find_aborted_special_tasks'): 381 self._find_aborted_special_tasks() 382 with breakdown_timer.Step('handle_agents'): 383 self._handle_agents() 384 with breakdown_timer.Step('host_scheduler_tick'): 385 self._log_tick_msg('Starting _host_scheduler.tick') 386 self._host_scheduler.tick() 387 with breakdown_timer.Step('drones_execute_actions'): 388 self._log_tick_msg('Starting _drone_manager.execute_actions') 389 _drone_manager.execute_actions() 390 with breakdown_timer.Step('send_queued_emails'): 391 self._log_tick_msg( 392 'Starting email_manager.manager.send_queued_emails') 393 email_manager.manager.send_queued_emails() 394 with breakdown_timer.Step('db_reset_queries'): 395 self._log_tick_msg('Starting django.db.reset_queries') 396 django.db.reset_queries() 397 398 self._tick_count += 1 399 metrics.Counter('chromeos/autotest/scheduler/tick').increment() 400 401 402 @_calls_log_tick_msg 403 def _run_cleanup(self): 404 self._periodic_cleanup.run_cleanup_maybe() 405 self._24hr_upkeep.run_cleanup_maybe() 406 407 408 @_calls_log_tick_msg 409 def _garbage_collection(self): 410 threshold_time = time.time() - self._seconds_between_garbage_stats 411 if threshold_time < self._last_garbage_stats_time: 412 # Don't generate these reports very often. 413 return 414 415 self._last_garbage_stats_time = time.time() 416 # Force a full level 0 collection (because we can, it doesn't hurt 417 # at this interval). 418 gc.collect() 419 logging.info('Logging garbage collector stats on tick %d.', 420 self._tick_count) 421 gc_stats._log_garbage_collector_stats() 422 423 424 def _register_agent_for_ids(self, agent_dict, object_ids, agent): 425 for object_id in object_ids: 426 agent_dict.setdefault(object_id, set()).add(agent) 427 428 429 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent): 430 for object_id in object_ids: 431 assert object_id in agent_dict 432 agent_dict[object_id].remove(agent) 433 # If an ID has no more active agent associated, there is no need to 434 # keep it in the dictionary. Otherwise, scheduler will keep an 435 # unnecessarily big dictionary until being restarted. 436 if not agent_dict[object_id]: 437 agent_dict.pop(object_id) 438 439 440 def add_agent_task(self, agent_task): 441 """ 442 Creates and adds an agent to the dispatchers list. 443 444 In creating the agent we also pass on all the queue_entry_ids and 445 host_ids from the special agent task. For every agent we create, we 446 add it to 1. a dict against the queue_entry_ids given to it 2. A dict 447 against the host_ids given to it. So theoritically, a host can have any 448 number of agents associated with it, and each of them can have any 449 special agent task, though in practice we never see > 1 agent/task per 450 host at any time. 451 452 @param agent_task: A SpecialTask for the agent to manage. 453 """ 454 agent = Agent(agent_task) 455 self._agents.append(agent) 456 agent.dispatcher = self 457 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent) 458 self._register_agent_for_ids(self._queue_entry_agents, 459 agent.queue_entry_ids, agent) 460 461 462 def get_agents_for_entry(self, queue_entry): 463 """ 464 Find agents corresponding to the specified queue_entry. 465 """ 466 return list(self._queue_entry_agents.get(queue_entry.id, set())) 467 468 469 def host_has_agent(self, host): 470 """ 471 Determine if there is currently an Agent present using this host. 472 """ 473 return bool(self._host_agents.get(host.id, None)) 474 475 476 def remove_agent(self, agent): 477 self._agents.remove(agent) 478 self._unregister_agent_for_ids(self._host_agents, agent.host_ids, 479 agent) 480 self._unregister_agent_for_ids(self._queue_entry_agents, 481 agent.queue_entry_ids, agent) 482 483 484 def _host_has_scheduled_special_task(self, host): 485 return bool(models.SpecialTask.objects.filter(host__id=host.id, 486 is_active=False, 487 is_complete=False)) 488 489 490 def _recover_processes(self): 491 agent_tasks = self._create_recovery_agent_tasks() 492 self._register_pidfiles(agent_tasks) 493 _drone_manager.refresh() 494 self._recover_tasks(agent_tasks) 495 self._recover_pending_entries() 496 self._check_for_unrecovered_verifying_entries() 497 self._reverify_remaining_hosts() 498 # reinitialize drones after killing orphaned processes, since they can 499 # leave around files when they die 500 _drone_manager.execute_actions() 501 _drone_manager.reinitialize_drones() 502 503 504 def _create_recovery_agent_tasks(self): 505 return (self._get_queue_entry_agent_tasks() 506 + self._get_special_task_agent_tasks(is_active=True)) 507 508 509 def _get_queue_entry_agent_tasks(self): 510 """ 511 Get agent tasks for all hqe in the specified states. 512 513 Loosely this translates to taking a hqe in one of the specified states, 514 say parsing, and getting an AgentTask for it, like the FinalReparseTask, 515 through _get_agent_task_for_queue_entry. Each queue entry can only have 516 one agent task at a time, but there might be multiple queue entries in 517 the group. 518 519 @return: A list of AgentTasks. 520 """ 521 # host queue entry statuses handled directly by AgentTasks (Verifying is 522 # handled through SpecialTasks, so is not listed here) 523 statuses = (models.HostQueueEntry.Status.STARTING, 524 models.HostQueueEntry.Status.RUNNING, 525 models.HostQueueEntry.Status.GATHERING, 526 models.HostQueueEntry.Status.PARSING, 527 models.HostQueueEntry.Status.ARCHIVING) 528 status_list = ','.join("'%s'" % status for status in statuses) 529 queue_entries = scheduler_models.HostQueueEntry.fetch( 530 where='status IN (%s)' % status_list) 531 532 agent_tasks = [] 533 used_queue_entries = set() 534 hqe_count_by_status = {} 535 for entry in queue_entries: 536 hqe_count_by_status[entry.status] = ( 537 hqe_count_by_status.get(entry.status, 0) + 1) 538 if self.get_agents_for_entry(entry): 539 # already being handled 540 continue 541 if entry in used_queue_entries: 542 # already picked up by a synchronous job 543 continue 544 agent_task = self._get_agent_task_for_queue_entry(entry) 545 agent_tasks.append(agent_task) 546 used_queue_entries.update(agent_task.queue_entries) 547 548 for status, count in hqe_count_by_status.iteritems(): 549 metrics.Gauge( 550 'chromeos/autotest/scheduler/active_host_queue_entries' 551 ).set(count, fields={'status': status}) 552 553 return agent_tasks 554 555 556 def _get_special_task_agent_tasks(self, is_active=False): 557 special_tasks = models.SpecialTask.objects.filter( 558 is_active=is_active, is_complete=False) 559 return [self._get_agent_task_for_special_task(task) 560 for task in special_tasks] 561 562 563 def _get_agent_task_for_queue_entry(self, queue_entry): 564 """ 565 Construct an AgentTask instance for the given active HostQueueEntry. 566 567 @param queue_entry: a HostQueueEntry 568 @return: an AgentTask to run the queue entry 569 """ 570 task_entries = queue_entry.job.get_group_entries(queue_entry) 571 self._check_for_duplicate_host_entries(task_entries) 572 573 if queue_entry.status in (models.HostQueueEntry.Status.STARTING, 574 models.HostQueueEntry.Status.RUNNING): 575 if queue_entry.is_hostless(): 576 return HostlessQueueTask(queue_entry=queue_entry) 577 return QueueTask(queue_entries=task_entries) 578 if queue_entry.status == models.HostQueueEntry.Status.GATHERING: 579 return postjob_task.GatherLogsTask(queue_entries=task_entries) 580 if queue_entry.status == models.HostQueueEntry.Status.PARSING: 581 return postjob_task.FinalReparseTask(queue_entries=task_entries) 582 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: 583 return postjob_task.ArchiveResultsTask(queue_entries=task_entries) 584 585 raise scheduler_lib.SchedulerError( 586 '_get_agent_task_for_queue_entry got entry with ' 587 'invalid status %s: %s' % (queue_entry.status, queue_entry)) 588 589 590 def _check_for_duplicate_host_entries(self, task_entries): 591 non_host_statuses = (models.HostQueueEntry.Status.PARSING, 592 models.HostQueueEntry.Status.ARCHIVING) 593 for task_entry in task_entries: 594 using_host = (task_entry.host is not None 595 and task_entry.status not in non_host_statuses) 596 if using_host: 597 self._assert_host_has_no_agent(task_entry) 598 599 600 def _assert_host_has_no_agent(self, entry): 601 """ 602 @param entry: a HostQueueEntry or a SpecialTask 603 """ 604 if self.host_has_agent(entry.host): 605 agent = tuple(self._host_agents.get(entry.host.id))[0] 606 raise scheduler_lib.SchedulerError( 607 'While scheduling %s, host %s already has a host agent %s' 608 % (entry, entry.host, agent.task)) 609 610 611 def _get_agent_task_for_special_task(self, special_task): 612 """ 613 Construct an AgentTask class to run the given SpecialTask and add it 614 to this dispatcher. 615 616 A special task is created through schedule_special_tasks, but only if 617 the host doesn't already have an agent. This happens through 618 add_agent_task. All special agent tasks are given a host on creation, 619 and a Null hqe. To create a SpecialAgentTask object, you need a 620 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask 621 object contains a hqe it's passed on to the special agent task, which 622 creates a HostQueueEntry and saves it as it's queue_entry. 623 624 @param special_task: a models.SpecialTask instance 625 @returns an AgentTask to run this SpecialTask 626 """ 627 self._assert_host_has_no_agent(special_task) 628 629 special_agent_task_classes = (prejob_task.CleanupTask, 630 prejob_task.VerifyTask, 631 prejob_task.RepairTask, 632 prejob_task.ResetTask, 633 prejob_task.ProvisionTask) 634 635 for agent_task_class in special_agent_task_classes: 636 if agent_task_class.TASK_TYPE == special_task.task: 637 return agent_task_class(task=special_task) 638 639 raise scheduler_lib.SchedulerError( 640 'No AgentTask class for task', str(special_task)) 641 642 643 def _register_pidfiles(self, agent_tasks): 644 for agent_task in agent_tasks: 645 agent_task.register_necessary_pidfiles() 646 647 648 def _recover_tasks(self, agent_tasks): 649 orphans = _drone_manager.get_orphaned_autoserv_processes() 650 651 for agent_task in agent_tasks: 652 agent_task.recover() 653 if agent_task.monitor and agent_task.monitor.has_process(): 654 orphans.discard(agent_task.monitor.get_process()) 655 self.add_agent_task(agent_task) 656 657 self._check_for_remaining_orphan_processes(orphans) 658 659 660 def _get_unassigned_entries(self, status): 661 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'" 662 % status): 663 if entry.status == status and not self.get_agents_for_entry(entry): 664 # The status can change during iteration, e.g., if job.run() 665 # sets a group of queue entries to Starting 666 yield entry 667 668 669 def _check_for_remaining_orphan_processes(self, orphans): 670 m = 'chromeos/autotest/errors/unrecovered_orphan_processes' 671 metrics.Gauge(m).set(len(orphans)) 672 673 if not orphans: 674 return 675 subject = 'Unrecovered orphan autoserv processes remain' 676 message = '\n'.join(str(process) for process in orphans) 677 die_on_orphans = global_config.global_config.get_config_value( 678 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool) 679 680 if die_on_orphans: 681 raise RuntimeError(subject + '\n' + message) 682 683 684 def _recover_pending_entries(self): 685 for entry in self._get_unassigned_entries( 686 models.HostQueueEntry.Status.PENDING): 687 logging.info('Recovering Pending entry %s', entry) 688 entry.on_pending() 689 690 691 def _check_for_unrecovered_verifying_entries(self): 692 queue_entries = scheduler_models.HostQueueEntry.fetch( 693 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING) 694 unrecovered_hqes = [] 695 for queue_entry in queue_entries: 696 special_tasks = models.SpecialTask.objects.filter( 697 task__in=(models.SpecialTask.Task.CLEANUP, 698 models.SpecialTask.Task.VERIFY), 699 queue_entry__id=queue_entry.id, 700 is_complete=False) 701 if special_tasks.count() == 0: 702 unrecovered_hqes.append(queue_entry) 703 704 if unrecovered_hqes: 705 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) 706 raise scheduler_lib.SchedulerError( 707 '%d unrecovered verifying host queue entries:\n%s' % 708 (len(unrecovered_hqes), message)) 709 710 711 @_calls_log_tick_msg 712 def _schedule_special_tasks(self): 713 """ 714 Execute queued SpecialTasks that are ready to run on idle hosts. 715 716 Special tasks include PreJobTasks like verify, reset and cleanup. 717 They are created through _schedule_new_jobs and associated with a hqe 718 This method translates SpecialTasks to the appropriate AgentTask and 719 adds them to the dispatchers agents list, so _handle_agents can execute 720 them. 721 """ 722 # When the host scheduler is responsible for acquisition we only want 723 # to run tasks with leased hosts. All hqe tasks will already have 724 # leased hosts, and we don't want to run frontend tasks till the host 725 # scheduler has vetted the assignment. Note that this doesn't include 726 # frontend tasks with hosts leased by other active hqes. 727 for task in self._job_query_manager.get_prioritized_special_tasks( 728 only_tasks_with_leased_hosts=not self._inline_host_acquisition): 729 if self.host_has_agent(task.host): 730 continue 731 self.add_agent_task(self._get_agent_task_for_special_task(task)) 732 733 734 def _reverify_remaining_hosts(self): 735 # recover active hosts that have not yet been recovered, although this 736 # should never happen 737 message = ('Recovering active host %s - this probably indicates a ' 738 'scheduler bug') 739 self._reverify_hosts_where( 740 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')", 741 print_message=message) 742 743 744 def _reverify_hosts_where(self, where, 745 print_message='Reverifying host %s'): 746 full_where='locked = 0 AND invalid = 0 AND ' + where 747 for host in scheduler_models.Host.fetch(where=full_where): 748 if self.host_has_agent(host): 749 # host has already been recovered in some way 750 continue 751 if self._host_has_scheduled_special_task(host): 752 # host will have a special task scheduled on the next tick 753 continue 754 if print_message: 755 logging.info(print_message, host.hostname) 756 models.SpecialTask.objects.create( 757 task=models.SpecialTask.Task.CLEANUP, 758 host=models.Host.objects.get(id=host.id)) 759 760 761 def _recover_hosts(self): 762 # recover "Repair Failed" hosts 763 message = 'Reverifying dead host %s' 764 self._reverify_hosts_where("status = 'Repair Failed'", 765 print_message=message) 766 767 768 def _refresh_pending_queue_entries(self): 769 """ 770 Lookup the pending HostQueueEntries and call our HostScheduler 771 refresh() method given that list. Return the list. 772 773 @returns A list of pending HostQueueEntries sorted in priority order. 774 """ 775 queue_entries = self._job_query_manager.get_pending_queue_entries( 776 only_hostless=not self._inline_host_acquisition) 777 if not queue_entries: 778 return [] 779 return queue_entries 780 781 782 def _schedule_hostless_job(self, queue_entry): 783 """Schedule a hostless (suite) job. 784 785 @param queue_entry: The queue_entry representing the hostless job. 786 """ 787 self.add_agent_task(HostlessQueueTask(queue_entry)) 788 789 # Need to set execution_subdir before setting the status: 790 # After a restart of the scheduler, agents will be restored for HQEs in 791 # Starting, Running, Gathering, Parsing or Archiving. To do this, the 792 # execution_subdir is needed. Therefore it must be set before entering 793 # one of these states. 794 # Otherwise, if the scheduler was interrupted between setting the status 795 # and the execution_subdir, upon it's restart restoring agents would 796 # fail. 797 # Is there a way to get a status in one of these states without going 798 # through this code? Following cases are possible: 799 # - If it's aborted before being started: 800 # active bit will be 0, so there's nothing to parse, it will just be 801 # set to completed by _find_aborting. Critical statuses are skipped. 802 # - If it's aborted or it fails after being started: 803 # It was started, so this code was executed. 804 queue_entry.update_field('execution_subdir', 'hostless') 805 queue_entry.set_status(models.HostQueueEntry.Status.STARTING) 806 807 808 def _schedule_host_job(self, host, queue_entry): 809 """Schedules a job on the given host. 810 811 1. Assign the host to the hqe, if it isn't already assigned. 812 2. Create a SpecialAgentTask for the hqe. 813 3. Activate the hqe. 814 815 @param queue_entry: The job to schedule. 816 @param host: The host to schedule the job on. 817 """ 818 if self.host_has_agent(host): 819 host_agent_task = list(self._host_agents.get(host.id))[0].task 820 else: 821 self._host_scheduler.schedule_host_job(host, queue_entry) 822 823 824 @_calls_log_tick_msg 825 def _schedule_new_jobs(self): 826 """ 827 Find any new HQEs and call schedule_pre_job_tasks for it. 828 829 This involves setting the status of the HQE and creating a row in the 830 db corresponding the the special task, through 831 scheduler_models._queue_special_task. The new db row is then added as 832 an agent to the dispatcher through _schedule_special_tasks and 833 scheduled for execution on the drone through _handle_agents. 834 """ 835 queue_entries = self._refresh_pending_queue_entries() 836 837 key = 'scheduler.jobs_per_tick' 838 new_hostless_jobs = 0 839 new_jobs_with_hosts = 0 840 new_jobs_need_hosts = 0 841 host_jobs = [] 842 logging.debug('Processing %d queue_entries', len(queue_entries)) 843 844 for queue_entry in queue_entries: 845 if queue_entry.is_hostless(): 846 self._schedule_hostless_job(queue_entry) 847 new_hostless_jobs = new_hostless_jobs + 1 848 else: 849 host_jobs.append(queue_entry) 850 new_jobs_need_hosts = new_jobs_need_hosts + 1 851 852 metrics.Counter( 853 'chromeos/autotest/scheduler/scheduled_jobs_hostless' 854 ).increment_by(new_hostless_jobs) 855 856 if not host_jobs: 857 return 858 859 if not self._inline_host_acquisition: 860 # In this case, host_scheduler is responsible for scheduling 861 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption 862 # since host_scheduler assumes it is the single process scheduling 863 # host jobs. 864 metrics.Gauge( 865 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set( 866 len(host_jobs)) 867 return 868 869 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs) 870 for host_assignment in jobs_with_hosts: 871 self._schedule_host_job(host_assignment.host, host_assignment.job) 872 new_jobs_with_hosts = new_jobs_with_hosts + 1 873 874 metrics.Counter( 875 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts' 876 ).increment_by(new_jobs_with_hosts) 877 # TODO(pprabhu): Decide what to do about this metric. Million dollar 878 # question: What happens to jobs that were not matched. Do they stay in 879 # the queue, and get processed right here in the next tick (then we want 880 # a guage corresponding to the number of outstanding unmatched host 881 # jobs), or are they handled somewhere else (then we need a counter 882 # corresponding to failed_to_match_with_hosts jobs). 883 #autotest_stats.Gauge(key).send('new_jobs_without_hosts', 884 # new_jobs_need_hosts - 885 # new_jobs_with_hosts) 886 887 888 @_calls_log_tick_msg 889 def _schedule_running_host_queue_entries(self): 890 """ 891 Adds agents to the dispatcher. 892 893 Any AgentTask, like the QueueTask, is wrapped in an Agent. The 894 QueueTask for example, will have a job with a control file, and 895 the agent will have methods that poll, abort and check if the queue 896 task is finished. The dispatcher runs the agent_task, as well as 897 other agents in it's _agents member, through _handle_agents, by 898 calling the Agents tick(). 899 900 This method creates an agent for each HQE in one of (starting, running, 901 gathering, parsing, archiving) states, and adds it to the dispatcher so 902 it is handled by _handle_agents. 903 """ 904 for agent_task in self._get_queue_entry_agent_tasks(): 905 self.add_agent_task(agent_task) 906 907 908 @_calls_log_tick_msg 909 def _find_aborting(self): 910 """ 911 Looks through the afe_host_queue_entries for an aborted entry. 912 913 The aborted bit is set on an HQE in many ways, the most common 914 being when a user requests an abort through the frontend, which 915 results in an rpc from the afe to abort_host_queue_entries. 916 """ 917 jobs_to_stop = set() 918 for entry in scheduler_models.HostQueueEntry.fetch( 919 where='aborted=1 and complete=0'): 920 921 # If the job is running on a shard, let the shard handle aborting 922 # it and sync back the right status. 923 if entry.job.shard_id is not None and not server_utils.is_shard(): 924 logging.info('Waiting for shard %s to abort hqe %s', 925 entry.job.shard_id, entry) 926 continue 927 928 logging.info('Aborting %s', entry) 929 930 # The task would have started off with both is_complete and 931 # is_active = False. Aborted tasks are neither active nor complete. 932 # For all currently active tasks this will happen through the agent, 933 # but we need to manually update the special tasks that haven't 934 # started yet, because they don't have agents. 935 models.SpecialTask.objects.filter(is_active=False, 936 queue_entry_id=entry.id).update(is_complete=True) 937 938 for agent in self.get_agents_for_entry(entry): 939 agent.abort() 940 entry.abort(self) 941 jobs_to_stop.add(entry.job) 942 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop)) 943 for job in jobs_to_stop: 944 job.stop_if_necessary() 945 946 947 @_calls_log_tick_msg 948 def _find_aborted_special_tasks(self): 949 """ 950 Find SpecialTasks that have been marked for abortion. 951 952 Poll the database looking for SpecialTasks that are active 953 and have been marked for abortion, then abort them. 954 """ 955 956 # The completed and active bits are very important when it comes 957 # to scheduler correctness. The active bit is set through the prolog 958 # of a special task, and reset through the cleanup method of the 959 # SpecialAgentTask. The cleanup is called both through the abort and 960 # epilog. The complete bit is set in several places, and in general 961 # a hanging job will have is_active=1 is_complete=0, while a special 962 # task which completed will have is_active=0 is_complete=1. To check 963 # aborts we directly check active because the complete bit is set in 964 # several places, including the epilog of agent tasks. 965 aborted_tasks = models.SpecialTask.objects.filter(is_active=True, 966 is_aborted=True) 967 for task in aborted_tasks: 968 # There are 2 ways to get the agent associated with a task, 969 # through the host and through the hqe. A special task 970 # always needs a host, but doesn't always need a hqe. 971 for agent in self._host_agents.get(task.host.id, []): 972 if isinstance(agent.task, agent_task.SpecialAgentTask): 973 974 # The epilog preforms critical actions such as 975 # queueing the next SpecialTask, requeuing the 976 # hqe etc, however it doesn't actually kill the 977 # monitor process and set the 'done' bit. Epilogs 978 # assume that the job failed, and that the monitor 979 # process has already written an exit code. The 980 # done bit is a necessary condition for 981 # _handle_agents to schedule any more special 982 # tasks against the host, and it must be set 983 # in addition to is_active, is_complete and success. 984 agent.task.epilog() 985 agent.task.abort() 986 987 988 def _can_start_agent(self, agent, have_reached_limit): 989 # always allow zero-process agents to run 990 if agent.task.num_processes == 0: 991 return True 992 # don't allow any nonzero-process agents to run after we've reached a 993 # limit (this avoids starvation of many-process agents) 994 if have_reached_limit: 995 return False 996 # total process throttling 997 max_runnable_processes = _drone_manager.max_runnable_processes( 998 agent.task.owner_username, 999 agent.task.get_drone_hostnames_allowed()) 1000 if agent.task.num_processes > max_runnable_processes: 1001 return False 1002 return True 1003 1004 1005 @_calls_log_tick_msg 1006 def _handle_agents(self): 1007 """ 1008 Handles agents of the dispatcher. 1009 1010 Appropriate Agents are added to the dispatcher through 1011 _schedule_running_host_queue_entries. These agents each 1012 have a task. This method runs the agents task through 1013 agent.tick() leading to: 1014 agent.start 1015 prolog -> AgentTasks prolog 1016 For each queue entry: 1017 sets host status/status to Running 1018 set started_on in afe_host_queue_entries 1019 run -> AgentTasks run 1020 Creates PidfileRunMonitor 1021 Queues the autoserv command line for this AgentTask 1022 via the drone manager. These commands are executed 1023 through the drone managers execute actions. 1024 poll -> AgentTasks/BaseAgentTask poll 1025 checks the monitors exit_code. 1026 Executes epilog if task is finished. 1027 Executes AgentTasks _finish_task 1028 finish_task is usually responsible for setting the status 1029 of the HQE/host, and updating it's active and complete fileds. 1030 1031 agent.is_done 1032 Removed the agent from the dispatchers _agents queue. 1033 Is_done checks the finished bit on the agent, that is 1034 set based on the Agents task. During the agents poll 1035 we check to see if the monitor process has exited in 1036 it's finish method, and set the success member of the 1037 task based on this exit code. 1038 """ 1039 num_started_this_tick = 0 1040 num_finished_this_tick = 0 1041 have_reached_limit = False 1042 # iterate over copy, so we can remove agents during iteration 1043 logging.debug('Handling %d Agents', len(self._agents)) 1044 for agent in list(self._agents): 1045 self._log_extra_msg('Processing Agent with Host Ids: %s and ' 1046 'queue_entry ids:%s' % (agent.host_ids, 1047 agent.queue_entry_ids)) 1048 if not agent.started: 1049 if not self._can_start_agent(agent, have_reached_limit): 1050 have_reached_limit = True 1051 logging.debug('Reached Limit of allowed running Agents.') 1052 continue 1053 num_started_this_tick += agent.task.num_processes 1054 self._log_extra_msg('Starting Agent') 1055 agent.tick() 1056 self._log_extra_msg('Agent tick completed.') 1057 if agent.is_done(): 1058 num_finished_this_tick += agent.task.num_processes 1059 self._log_extra_msg("Agent finished") 1060 self.remove_agent(agent) 1061 1062 metrics.Counter( 1063 'chromeos/autotest/scheduler/agent_processes_started' 1064 ).increment_by(num_started_this_tick) 1065 metrics.Counter( 1066 'chromeos/autotest/scheduler/agent_processes_finished' 1067 ).increment_by(num_finished_this_tick) 1068 num_agent_processes = _drone_manager.total_running_processes() 1069 metrics.Gauge( 1070 'chromeos/autotest/scheduler/agent_processes' 1071 ).set(num_agent_processes) 1072 logging.info('%d running processes. %d added this tick.', 1073 num_agent_processes, num_started_this_tick) 1074 1075 1076 def _log_tick_msg(self, msg): 1077 if self._tick_debug: 1078 logging.debug(msg) 1079 1080 1081 def _log_extra_msg(self, msg): 1082 if self._extra_debugging: 1083 logging.debug(msg) 1084 1085 1086SiteDispatcher = utils.import_site_class( 1087 __file__, 'autotest_lib.scheduler.site_monitor_db', 1088 'SiteDispatcher', BaseDispatcher) 1089 1090class Dispatcher(SiteDispatcher): 1091 pass 1092 1093 1094class Agent(object): 1095 """ 1096 An agent for use by the Dispatcher class to perform a task. An agent wraps 1097 around an AgentTask mainly to associate the AgentTask with the queue_entry 1098 and host ids. 1099 1100 The following methods are required on all task objects: 1101 poll() - Called periodically to let the task check its status and 1102 update its internal state. If the task succeeded. 1103 is_done() - Returns True if the task is finished. 1104 abort() - Called when an abort has been requested. The task must 1105 set its aborted attribute to True if it actually aborted. 1106 1107 The following attributes are required on all task objects: 1108 aborted - bool, True if this task was aborted. 1109 success - bool, True if this task succeeded. 1110 queue_entry_ids - A sequence of HostQueueEntry ids this task handles. 1111 host_ids - A sequence of Host ids this task represents. 1112 """ 1113 1114 1115 def __init__(self, task): 1116 """ 1117 @param task: An instance of an AgentTask. 1118 """ 1119 self.task = task 1120 1121 # This is filled in by Dispatcher.add_agent() 1122 self.dispatcher = None 1123 1124 self.queue_entry_ids = task.queue_entry_ids 1125 self.host_ids = task.host_ids 1126 1127 self.started = False 1128 self.finished = False 1129 1130 1131 def tick(self): 1132 self.started = True 1133 if not self.finished: 1134 self.task.poll() 1135 if self.task.is_done(): 1136 self.finished = True 1137 1138 1139 def is_done(self): 1140 return self.finished 1141 1142 1143 def abort(self): 1144 if self.task: 1145 self.task.abort() 1146 if self.task.aborted: 1147 # tasks can choose to ignore aborts 1148 self.finished = True 1149 1150 1151class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals): 1152 """ 1153 Common functionality for QueueTask and HostlessQueueTask 1154 """ 1155 def __init__(self, queue_entries): 1156 super(AbstractQueueTask, self).__init__() 1157 self.job = queue_entries[0].job 1158 self.queue_entries = queue_entries 1159 1160 1161 def _keyval_path(self): 1162 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 1163 1164 1165 def _write_control_file(self, execution_path): 1166 control_path = _drone_manager.attach_file_to_execution( 1167 execution_path, self.job.control_file) 1168 return control_path 1169 1170 1171 # TODO: Refactor into autoserv_utils. crbug.com/243090 1172 def _command_line(self): 1173 execution_path = self.queue_entries[0].execution_path() 1174 control_path = self._write_control_file(execution_path) 1175 hostnames = ','.join(entry.host.hostname 1176 for entry in self.queue_entries 1177 if not entry.is_hostless()) 1178 1179 execution_tag = self.queue_entries[0].execution_tag() 1180 params = _autoserv_command_line( 1181 hostnames, 1182 ['-P', execution_tag, '-n', 1183 _drone_manager.absolute_path(control_path)], 1184 job=self.job, verbose=False) 1185 if self.job.is_image_update_job(): 1186 params += ['--image', self.job.update_image_path] 1187 1188 return params 1189 1190 1191 @property 1192 def num_processes(self): 1193 return len(self.queue_entries) 1194 1195 1196 @property 1197 def owner_username(self): 1198 return self.job.owner 1199 1200 1201 def _working_directory(self): 1202 return self._get_consistent_execution_path(self.queue_entries) 1203 1204 1205 def prolog(self): 1206 queued_key, queued_time = self._job_queued_keyval(self.job) 1207 keyval_dict = self.job.keyval_dict() 1208 keyval_dict[queued_key] = queued_time 1209 self._write_keyvals_before_job(keyval_dict) 1210 for queue_entry in self.queue_entries: 1211 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING) 1212 queue_entry.set_started_on_now() 1213 1214 1215 def _write_lost_process_error_file(self): 1216 error_file_path = os.path.join(self._working_directory(), 'job_failure') 1217 _drone_manager.write_lines_to_file(error_file_path, 1218 [_LOST_PROCESS_ERROR]) 1219 1220 1221 def _finish_task(self): 1222 if not self.monitor: 1223 return 1224 1225 self._write_job_finished() 1226 1227 if self.monitor.lost_process: 1228 self._write_lost_process_error_file() 1229 1230 1231 def _write_status_comment(self, comment): 1232 _drone_manager.write_lines_to_file( 1233 os.path.join(self._working_directory(), 'status.log'), 1234 ['INFO\t----\t----\t' + comment], 1235 paired_with_process=self.monitor.get_process()) 1236 1237 1238 def _log_abort(self): 1239 if not self.monitor or not self.monitor.has_process(): 1240 return 1241 1242 # build up sets of all the aborted_by and aborted_on values 1243 aborted_by, aborted_on = set(), set() 1244 for queue_entry in self.queue_entries: 1245 if queue_entry.aborted_by: 1246 aborted_by.add(queue_entry.aborted_by) 1247 t = int(time.mktime(queue_entry.aborted_on.timetuple())) 1248 aborted_on.add(t) 1249 1250 # extract some actual, unique aborted by value and write it out 1251 # TODO(showard): this conditional is now obsolete, we just need to leave 1252 # it in temporarily for backwards compatibility over upgrades. delete 1253 # soon. 1254 assert len(aborted_by) <= 1 1255 if len(aborted_by) == 1: 1256 aborted_by_value = aborted_by.pop() 1257 aborted_on_value = max(aborted_on) 1258 else: 1259 aborted_by_value = 'autotest_system' 1260 aborted_on_value = int(time.time()) 1261 1262 self._write_keyval_after_job("aborted_by", aborted_by_value) 1263 self._write_keyval_after_job("aborted_on", aborted_on_value) 1264 1265 aborted_on_string = str(datetime.datetime.fromtimestamp( 1266 aborted_on_value)) 1267 self._write_status_comment('Job aborted by %s on %s' % 1268 (aborted_by_value, aborted_on_string)) 1269 1270 1271 def abort(self): 1272 super(AbstractQueueTask, self).abort() 1273 self._log_abort() 1274 self._finish_task() 1275 1276 1277 def epilog(self): 1278 super(AbstractQueueTask, self).epilog() 1279 self._finish_task() 1280 1281 1282class QueueTask(AbstractQueueTask): 1283 def __init__(self, queue_entries): 1284 super(QueueTask, self).__init__(queue_entries) 1285 self._set_ids(queue_entries=queue_entries) 1286 self._enable_ssp_container = ( 1287 global_config.global_config.get_config_value( 1288 'AUTOSERV', 'enable_ssp_container', type=bool, 1289 default=True)) 1290 1291 1292 def prolog(self): 1293 self._check_queue_entry_statuses( 1294 self.queue_entries, 1295 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING, 1296 models.HostQueueEntry.Status.RUNNING), 1297 allowed_host_statuses=(models.Host.Status.PENDING, 1298 models.Host.Status.RUNNING)) 1299 1300 super(QueueTask, self).prolog() 1301 1302 for queue_entry in self.queue_entries: 1303 self._write_host_keyvals(queue_entry.host) 1304 queue_entry.host.set_status(models.Host.Status.RUNNING) 1305 queue_entry.host.update_field('dirty', 1) 1306 1307 1308 def _finish_task(self): 1309 super(QueueTask, self)._finish_task() 1310 1311 for queue_entry in self.queue_entries: 1312 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) 1313 queue_entry.host.set_status(models.Host.Status.RUNNING) 1314 1315 1316 def _command_line(self): 1317 invocation = super(QueueTask, self)._command_line() 1318 # Check if server-side packaging is needed. 1319 if (self._enable_ssp_container and 1320 self.job.control_type == control_data.CONTROL_TYPE.SERVER and 1321 self.job.require_ssp != False): 1322 invocation += ['--require-ssp'] 1323 keyval_dict = self.job.keyval_dict() 1324 test_source_build = keyval_dict.get('test_source_build', None) 1325 if test_source_build: 1326 invocation += ['--test_source_build', test_source_build] 1327 if self.job.parent_job_id: 1328 invocation += ['--parent_job_id', str(self.job.parent_job_id)] 1329 return invocation + ['--verify_job_repo_url'] 1330 1331 1332class HostlessQueueTask(AbstractQueueTask): 1333 def __init__(self, queue_entry): 1334 super(HostlessQueueTask, self).__init__([queue_entry]) 1335 self.queue_entry_ids = [queue_entry.id] 1336 1337 1338 def prolog(self): 1339 super(HostlessQueueTask, self).prolog() 1340 1341 1342 def _finish_task(self): 1343 super(HostlessQueueTask, self)._finish_task() 1344 1345 # When a job is added to database, its initial status is always 1346 # Starting. In a scheduler tick, scheduler finds all jobs in Starting 1347 # status, check if any of them can be started. If scheduler hits some 1348 # limit, e.g., max_hostless_jobs_per_drone, scheduler will 1349 # leave these jobs in Starting status. Otherwise, the jobs' 1350 # status will be changed to Running, and an autoserv process 1351 # will be started in drone for each of these jobs. 1352 # If the entry is still in status Starting, the process has not started 1353 # yet. Therefore, there is no need to parse and collect log. Without 1354 # this check, exception will be raised by scheduler as execution_subdir 1355 # for this queue entry does not have a value yet. 1356 hqe = self.queue_entries[0] 1357 if hqe.status != models.HostQueueEntry.Status.STARTING: 1358 hqe.set_status(models.HostQueueEntry.Status.PARSING) 1359 1360 1361if __name__ == '__main__': 1362 main() 1363