1#!/usr/bin/python
2
3#pylint: disable=C0111
4
5"""
6Autotest scheduler
7"""
8
9import datetime
10import functools
11import gc
12import logging
13import optparse
14import os
15import signal
16import sys
17import time
18
19import common
20from autotest_lib.frontend import setup_django_environment
21
22import django.db
23
24from autotest_lib.client.common_lib import control_data
25from autotest_lib.client.common_lib import global_config
26from autotest_lib.client.common_lib import utils
27from autotest_lib.frontend.afe import models
28from autotest_lib.scheduler import agent_task, drone_manager
29from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
30from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
31from autotest_lib.scheduler import postjob_task
32from autotest_lib.scheduler import query_managers
33from autotest_lib.scheduler import scheduler_lib
34from autotest_lib.scheduler import scheduler_models
35from autotest_lib.scheduler import status_server, scheduler_config
36from autotest_lib.server import autoserv_utils
37from autotest_lib.server import system_utils
38from autotest_lib.server import utils as server_utils
39from autotest_lib.site_utils import metadata_reporter
40from autotest_lib.site_utils import server_manager_utils
41
42try:
43    from chromite.lib import metrics
44    from chromite.lib import ts_mon_config
45except ImportError:
46    metrics = utils.metrics_mock
47    ts_mon_config = utils.metrics_mock
48
49
50BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
51PID_FILE_PREFIX = 'monitor_db'
52
53RESULTS_DIR = '.'
54AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
55
56if os.environ.has_key('AUTOTEST_DIR'):
57    AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
58AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
59AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
60
61if AUTOTEST_SERVER_DIR not in sys.path:
62    sys.path.insert(0, AUTOTEST_SERVER_DIR)
63
64# error message to leave in results dir when an autoserv process disappears
65# mysteriously
66_LOST_PROCESS_ERROR = """\
67Autoserv failed abnormally during execution for this job, probably due to a
68system error on the Autotest server.  Full results may not be available.  Sorry.
69"""
70
71_db_manager = None
72_db = None
73_shutdown = False
74
75# These 2 globals are replaced for testing
76_autoserv_directory = autoserv_utils.autoserv_directory
77_autoserv_path = autoserv_utils.autoserv_path
78_testing_mode = False
79_drone_manager = None
80
81
82def _site_init_monitor_db_dummy():
83    return {}
84
85
86def _verify_default_drone_set_exists():
87    if (models.DroneSet.drone_sets_enabled() and
88            not models.DroneSet.default_drone_set_name()):
89        raise scheduler_lib.SchedulerError(
90                'Drone sets are enabled, but no default is set')
91
92
93def _sanity_check():
94    """Make sure the configs are consistent before starting the scheduler"""
95    _verify_default_drone_set_exists()
96
97
98def main():
99    try:
100        try:
101            main_without_exception_handling()
102        except SystemExit:
103            raise
104        except:
105            logging.exception('Exception escaping in monitor_db')
106            raise
107    finally:
108        utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
109
110
111def main_without_exception_handling():
112    scheduler_lib.setup_logging(
113            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
114            os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
115    usage = 'usage: %prog [options] results_dir'
116    parser = optparse.OptionParser(usage)
117    parser.add_option('--recover-hosts', help='Try to recover dead hosts',
118                      action='store_true')
119    parser.add_option('--test', help='Indicate that scheduler is under ' +
120                      'test and should use dummy autoserv and no parsing',
121                      action='store_true')
122    parser.add_option('--production',
123                      help=('Indicate that scheduler is running in production '
124                            'environment and it can use database that is not '
125                            'hosted in localhost. If it is set to False, '
126                            'scheduler will fail if database is not in '
127                            'localhost.'),
128                      action='store_true', default=False)
129    (options, args) = parser.parse_args()
130    if len(args) != 1:
131        parser.print_usage()
132        return
133
134    scheduler_lib.check_production_settings(options)
135
136    scheduler_enabled = global_config.global_config.get_config_value(
137        scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
138
139    if not scheduler_enabled:
140        logging.error("Scheduler not enabled, set enable_scheduler to true in "
141                      "the global_config's SCHEDULER section to enable it. "
142                      "Exiting.")
143        sys.exit(1)
144
145    global RESULTS_DIR
146    RESULTS_DIR = args[0]
147
148    site_init = utils.import_site_function(__file__,
149        "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
150        _site_init_monitor_db_dummy)
151    site_init()
152
153    # Change the cwd while running to avoid issues incase we were launched from
154    # somewhere odd (such as a random NFS home directory of the person running
155    # sudo to launch us as the appropriate user).
156    os.chdir(RESULTS_DIR)
157
158    # This is helpful for debugging why stuff a scheduler launches is
159    # misbehaving.
160    logging.info('os.environ: %s', os.environ)
161
162    if options.test:
163        global _autoserv_path
164        _autoserv_path = 'autoserv_dummy'
165        global _testing_mode
166        _testing_mode = True
167
168    server = status_server.StatusServer()
169    server.start()
170
171    # Start the thread to report metadata.
172    metadata_reporter.start()
173
174    with ts_mon_config.SetupTsMonGlobalState('autotest_scheduler',
175                                             indirect=True):
176      try:
177          initialize()
178          dispatcher = Dispatcher()
179          dispatcher.initialize(recover_hosts=options.recover_hosts)
180          minimum_tick_sec = global_config.global_config.get_config_value(
181                  scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
182
183          while not _shutdown and not server._shutdown_scheduler:
184              start = time.time()
185              dispatcher.tick()
186              curr_tick_sec = time.time() - start
187              if minimum_tick_sec > curr_tick_sec:
188                  time.sleep(minimum_tick_sec - curr_tick_sec)
189              else:
190                  time.sleep(0.0001)
191      except server_manager_utils.ServerActionError as e:
192          # This error is expected when the server is not in primary status
193          # for scheduler role. Thus do not send email for it.
194          logging.exception(e)
195      except Exception:
196          email_manager.manager.log_stacktrace(
197              "Uncaught exception; terminating monitor_db")
198
199    metadata_reporter.abort()
200    email_manager.manager.send_queued_emails()
201    server.shutdown()
202    _drone_manager.shutdown()
203    _db_manager.disconnect()
204
205
206def handle_signal(signum, frame):
207    global _shutdown
208    _shutdown = True
209    logging.info("Shutdown request received.")
210
211
212def initialize():
213    logging.info("%s> dispatcher starting", time.strftime("%X %x"))
214    logging.info("My PID is %d", os.getpid())
215
216    if utils.program_is_alive(PID_FILE_PREFIX):
217        logging.critical("monitor_db already running, aborting!")
218        sys.exit(1)
219    utils.write_pid(PID_FILE_PREFIX)
220
221    if _testing_mode:
222        global_config.global_config.override_config_value(
223            scheduler_lib.DB_CONFIG_SECTION, 'database',
224            'stresstest_autotest_web')
225
226    # If server database is enabled, check if the server has role `scheduler`.
227    # If the server does not have scheduler role, exception will be raised and
228    # scheduler will not continue to run.
229    if server_manager_utils.use_server_db():
230        server_manager_utils.confirm_server_has_role(hostname='localhost',
231                                                     role='scheduler')
232
233    os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
234    global _db_manager
235    _db_manager = scheduler_lib.ConnectionManager()
236    global _db
237    _db = _db_manager.get_connection()
238    logging.info("Setting signal handler")
239    signal.signal(signal.SIGINT, handle_signal)
240    signal.signal(signal.SIGTERM, handle_signal)
241
242    initialize_globals()
243    scheduler_models.initialize()
244
245    drone_list = system_utils.get_drones()
246    results_host = global_config.global_config.get_config_value(
247        scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
248    _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
249
250    logging.info("Connected! Running...")
251
252
253def initialize_globals():
254    global _drone_manager
255    _drone_manager = drone_manager.instance()
256
257
258def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
259                           verbose=True):
260    """
261    @returns The autoserv command line as a list of executable + parameters.
262
263    @param machines - string - A machine or comma separated list of machines
264            for the (-m) flag.
265    @param extra_args - list - Additional arguments to pass to autoserv.
266    @param job - Job object - If supplied, -u owner, -l name, --test-retry,
267            and client -c or server -s parameters will be added.
268    @param queue_entry - A HostQueueEntry object - If supplied and no Job
269            object was supplied, this will be used to lookup the Job object.
270    """
271    command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
272            machines, results_directory=drone_manager.WORKING_DIRECTORY,
273            extra_args=extra_args, job=job, queue_entry=queue_entry,
274            verbose=verbose, in_lab=True)
275    return command
276
277def _calls_log_tick_msg(func):
278    """Used to trace functions called by BaseDispatcher.tick."""
279    @functools.wraps(func)
280    def wrapper(self, *args, **kwargs):
281        self._log_tick_msg('Starting %s' % func.__name__)
282        return func(self, *args, **kwargs)
283
284    return wrapper
285
286
287class BaseDispatcher(object):
288
289
290    def __init__(self):
291        self._agents = []
292        self._last_clean_time = time.time()
293        user_cleanup_time = scheduler_config.config.clean_interval_minutes
294        self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
295                _db, user_cleanup_time)
296        self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
297                _db, _drone_manager)
298        self._host_agents = {}
299        self._queue_entry_agents = {}
300        self._tick_count = 0
301        self._last_garbage_stats_time = time.time()
302        self._seconds_between_garbage_stats = 60 * (
303                global_config.global_config.get_config_value(
304                        scheduler_config.CONFIG_SECTION,
305                        'gc_stats_interval_mins', type=int, default=6*60))
306        self._tick_debug = global_config.global_config.get_config_value(
307                scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
308                default=False)
309        self._extra_debugging = global_config.global_config.get_config_value(
310                scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
311                default=False)
312        self._inline_host_acquisition = (
313                global_config.global_config.get_config_value(
314                        scheduler_config.CONFIG_SECTION,
315                        'inline_host_acquisition', type=bool, default=True))
316
317        # If _inline_host_acquisition is set the scheduler will acquire and
318        # release hosts against jobs inline, with the tick. Otherwise the
319        # scheduler will only focus on jobs that already have hosts, and
320        # will not explicitly unlease a host when a job finishes using it.
321        self._job_query_manager = query_managers.AFEJobQueryManager()
322        self._host_scheduler = (host_scheduler.BaseHostScheduler()
323                                if self._inline_host_acquisition else
324                                host_scheduler.DummyHostScheduler())
325
326
327    def initialize(self, recover_hosts=True):
328        self._periodic_cleanup.initialize()
329        self._24hr_upkeep.initialize()
330        # Execute all actions queued in the cleanup tasks. Scheduler tick will
331        # run a refresh task first. If there is any action in the queue, refresh
332        # will raise an exception.
333        _drone_manager.execute_actions()
334
335        # always recover processes
336        self._recover_processes()
337
338        if recover_hosts:
339            self._recover_hosts()
340
341
342    # TODO(pprabhu) Drop this metric once tick_times has been verified.
343    @metrics.SecondsTimerDecorator(
344            'chromeos/autotest/scheduler/tick_durations/tick')
345    def tick(self):
346        """
347        This is an altered version of tick() where we keep track of when each
348        major step begins so we can try to figure out where we are using most
349        of the tick time.
350        """
351        with metrics.RuntimeBreakdownTimer(
352            'chromeos/autotest/scheduler/tick_times') as breakdown_timer:
353            self._log_tick_msg('New tick')
354            system_utils.DroneCache.refresh()
355
356            with breakdown_timer.Step('garbage_collection'):
357                self._garbage_collection()
358            with breakdown_timer.Step('trigger_refresh'):
359                self._log_tick_msg('Starting _drone_manager.trigger_refresh')
360                _drone_manager.trigger_refresh()
361            with breakdown_timer.Step('schedule_running_host_queue_entries'):
362                self._schedule_running_host_queue_entries()
363            with breakdown_timer.Step('schedule_special_tasks'):
364                self._schedule_special_tasks()
365            with breakdown_timer.Step('schedule_new_jobs'):
366                self._schedule_new_jobs()
367            with breakdown_timer.Step('sync_refresh'):
368                self._log_tick_msg('Starting _drone_manager.sync_refresh')
369                _drone_manager.sync_refresh()
370            # _run_cleanup must be called between drone_manager.sync_refresh,
371            # and drone_manager.execute_actions, as sync_refresh will clear the
372            # calls queued in drones. Therefore, any action that calls
373            # drone.queue_call to add calls to the drone._calls, should be after
374            # drone refresh is completed and before
375            # drone_manager.execute_actions at the end of the tick.
376            with breakdown_timer.Step('run_cleanup'):
377                self._run_cleanup()
378            with breakdown_timer.Step('find_aborting'):
379                self._find_aborting()
380            with breakdown_timer.Step('find_aborted_special_tasks'):
381                self._find_aborted_special_tasks()
382            with breakdown_timer.Step('handle_agents'):
383                self._handle_agents()
384            with breakdown_timer.Step('host_scheduler_tick'):
385                self._log_tick_msg('Starting _host_scheduler.tick')
386                self._host_scheduler.tick()
387            with breakdown_timer.Step('drones_execute_actions'):
388                self._log_tick_msg('Starting _drone_manager.execute_actions')
389                _drone_manager.execute_actions()
390            with breakdown_timer.Step('send_queued_emails'):
391                self._log_tick_msg(
392                    'Starting email_manager.manager.send_queued_emails')
393                email_manager.manager.send_queued_emails()
394            with breakdown_timer.Step('db_reset_queries'):
395                self._log_tick_msg('Starting django.db.reset_queries')
396                django.db.reset_queries()
397
398            self._tick_count += 1
399            metrics.Counter('chromeos/autotest/scheduler/tick').increment()
400
401
402    @_calls_log_tick_msg
403    def _run_cleanup(self):
404        self._periodic_cleanup.run_cleanup_maybe()
405        self._24hr_upkeep.run_cleanup_maybe()
406
407
408    @_calls_log_tick_msg
409    def _garbage_collection(self):
410        threshold_time = time.time() - self._seconds_between_garbage_stats
411        if threshold_time < self._last_garbage_stats_time:
412            # Don't generate these reports very often.
413            return
414
415        self._last_garbage_stats_time = time.time()
416        # Force a full level 0 collection (because we can, it doesn't hurt
417        # at this interval).
418        gc.collect()
419        logging.info('Logging garbage collector stats on tick %d.',
420                     self._tick_count)
421        gc_stats._log_garbage_collector_stats()
422
423
424    def _register_agent_for_ids(self, agent_dict, object_ids, agent):
425        for object_id in object_ids:
426            agent_dict.setdefault(object_id, set()).add(agent)
427
428
429    def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
430        for object_id in object_ids:
431            assert object_id in agent_dict
432            agent_dict[object_id].remove(agent)
433            # If an ID has no more active agent associated, there is no need to
434            # keep it in the dictionary. Otherwise, scheduler will keep an
435            # unnecessarily big dictionary until being restarted.
436            if not agent_dict[object_id]:
437                agent_dict.pop(object_id)
438
439
440    def add_agent_task(self, agent_task):
441        """
442        Creates and adds an agent to the dispatchers list.
443
444        In creating the agent we also pass on all the queue_entry_ids and
445        host_ids from the special agent task. For every agent we create, we
446        add it to 1. a dict against the queue_entry_ids given to it 2. A dict
447        against the host_ids given to it. So theoritically, a host can have any
448        number of agents associated with it, and each of them can have any
449        special agent task, though in practice we never see > 1 agent/task per
450        host at any time.
451
452        @param agent_task: A SpecialTask for the agent to manage.
453        """
454        agent = Agent(agent_task)
455        self._agents.append(agent)
456        agent.dispatcher = self
457        self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
458        self._register_agent_for_ids(self._queue_entry_agents,
459                                     agent.queue_entry_ids, agent)
460
461
462    def get_agents_for_entry(self, queue_entry):
463        """
464        Find agents corresponding to the specified queue_entry.
465        """
466        return list(self._queue_entry_agents.get(queue_entry.id, set()))
467
468
469    def host_has_agent(self, host):
470        """
471        Determine if there is currently an Agent present using this host.
472        """
473        return bool(self._host_agents.get(host.id, None))
474
475
476    def remove_agent(self, agent):
477        self._agents.remove(agent)
478        self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
479                                       agent)
480        self._unregister_agent_for_ids(self._queue_entry_agents,
481                                       agent.queue_entry_ids, agent)
482
483
484    def _host_has_scheduled_special_task(self, host):
485        return bool(models.SpecialTask.objects.filter(host__id=host.id,
486                                                      is_active=False,
487                                                      is_complete=False))
488
489
490    def _recover_processes(self):
491        agent_tasks = self._create_recovery_agent_tasks()
492        self._register_pidfiles(agent_tasks)
493        _drone_manager.refresh()
494        self._recover_tasks(agent_tasks)
495        self._recover_pending_entries()
496        self._check_for_unrecovered_verifying_entries()
497        self._reverify_remaining_hosts()
498        # reinitialize drones after killing orphaned processes, since they can
499        # leave around files when they die
500        _drone_manager.execute_actions()
501        _drone_manager.reinitialize_drones()
502
503
504    def _create_recovery_agent_tasks(self):
505        return (self._get_queue_entry_agent_tasks()
506                + self._get_special_task_agent_tasks(is_active=True))
507
508
509    def _get_queue_entry_agent_tasks(self):
510        """
511        Get agent tasks for all hqe in the specified states.
512
513        Loosely this translates to taking a hqe in one of the specified states,
514        say parsing, and getting an AgentTask for it, like the FinalReparseTask,
515        through _get_agent_task_for_queue_entry. Each queue entry can only have
516        one agent task at a time, but there might be multiple queue entries in
517        the group.
518
519        @return: A list of AgentTasks.
520        """
521        # host queue entry statuses handled directly by AgentTasks (Verifying is
522        # handled through SpecialTasks, so is not listed here)
523        statuses = (models.HostQueueEntry.Status.STARTING,
524                    models.HostQueueEntry.Status.RUNNING,
525                    models.HostQueueEntry.Status.GATHERING,
526                    models.HostQueueEntry.Status.PARSING,
527                    models.HostQueueEntry.Status.ARCHIVING)
528        status_list = ','.join("'%s'" % status for status in statuses)
529        queue_entries = scheduler_models.HostQueueEntry.fetch(
530                where='status IN (%s)' % status_list)
531
532        agent_tasks = []
533        used_queue_entries = set()
534        hqe_count_by_status = {}
535        for entry in queue_entries:
536            hqe_count_by_status[entry.status] = (
537                hqe_count_by_status.get(entry.status, 0) + 1)
538            if self.get_agents_for_entry(entry):
539                # already being handled
540                continue
541            if entry in used_queue_entries:
542                # already picked up by a synchronous job
543                continue
544            agent_task = self._get_agent_task_for_queue_entry(entry)
545            agent_tasks.append(agent_task)
546            used_queue_entries.update(agent_task.queue_entries)
547
548        for status, count in hqe_count_by_status.iteritems():
549            metrics.Gauge(
550                'chromeos/autotest/scheduler/active_host_queue_entries'
551            ).set(count, fields={'status': status})
552
553        return agent_tasks
554
555
556    def _get_special_task_agent_tasks(self, is_active=False):
557        special_tasks = models.SpecialTask.objects.filter(
558                is_active=is_active, is_complete=False)
559        return [self._get_agent_task_for_special_task(task)
560                for task in special_tasks]
561
562
563    def _get_agent_task_for_queue_entry(self, queue_entry):
564        """
565        Construct an AgentTask instance for the given active HostQueueEntry.
566
567        @param queue_entry: a HostQueueEntry
568        @return: an AgentTask to run the queue entry
569        """
570        task_entries = queue_entry.job.get_group_entries(queue_entry)
571        self._check_for_duplicate_host_entries(task_entries)
572
573        if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
574                                  models.HostQueueEntry.Status.RUNNING):
575            if queue_entry.is_hostless():
576                return HostlessQueueTask(queue_entry=queue_entry)
577            return QueueTask(queue_entries=task_entries)
578        if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
579            return postjob_task.GatherLogsTask(queue_entries=task_entries)
580        if queue_entry.status == models.HostQueueEntry.Status.PARSING:
581            return postjob_task.FinalReparseTask(queue_entries=task_entries)
582        if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
583            return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
584
585        raise scheduler_lib.SchedulerError(
586                '_get_agent_task_for_queue_entry got entry with '
587                'invalid status %s: %s' % (queue_entry.status, queue_entry))
588
589
590    def _check_for_duplicate_host_entries(self, task_entries):
591        non_host_statuses = (models.HostQueueEntry.Status.PARSING,
592                             models.HostQueueEntry.Status.ARCHIVING)
593        for task_entry in task_entries:
594            using_host = (task_entry.host is not None
595                          and task_entry.status not in non_host_statuses)
596            if using_host:
597                self._assert_host_has_no_agent(task_entry)
598
599
600    def _assert_host_has_no_agent(self, entry):
601        """
602        @param entry: a HostQueueEntry or a SpecialTask
603        """
604        if self.host_has_agent(entry.host):
605            agent = tuple(self._host_agents.get(entry.host.id))[0]
606            raise scheduler_lib.SchedulerError(
607                    'While scheduling %s, host %s already has a host agent %s'
608                    % (entry, entry.host, agent.task))
609
610
611    def _get_agent_task_for_special_task(self, special_task):
612        """
613        Construct an AgentTask class to run the given SpecialTask and add it
614        to this dispatcher.
615
616        A special task is created through schedule_special_tasks, but only if
617        the host doesn't already have an agent. This happens through
618        add_agent_task. All special agent tasks are given a host on creation,
619        and a Null hqe. To create a SpecialAgentTask object, you need a
620        models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
621        object contains a hqe it's passed on to the special agent task, which
622        creates a HostQueueEntry and saves it as it's queue_entry.
623
624        @param special_task: a models.SpecialTask instance
625        @returns an AgentTask to run this SpecialTask
626        """
627        self._assert_host_has_no_agent(special_task)
628
629        special_agent_task_classes = (prejob_task.CleanupTask,
630                                      prejob_task.VerifyTask,
631                                      prejob_task.RepairTask,
632                                      prejob_task.ResetTask,
633                                      prejob_task.ProvisionTask)
634
635        for agent_task_class in special_agent_task_classes:
636            if agent_task_class.TASK_TYPE == special_task.task:
637                return agent_task_class(task=special_task)
638
639        raise scheduler_lib.SchedulerError(
640                'No AgentTask class for task', str(special_task))
641
642
643    def _register_pidfiles(self, agent_tasks):
644        for agent_task in agent_tasks:
645            agent_task.register_necessary_pidfiles()
646
647
648    def _recover_tasks(self, agent_tasks):
649        orphans = _drone_manager.get_orphaned_autoserv_processes()
650
651        for agent_task in agent_tasks:
652            agent_task.recover()
653            if agent_task.monitor and agent_task.monitor.has_process():
654                orphans.discard(agent_task.monitor.get_process())
655            self.add_agent_task(agent_task)
656
657        self._check_for_remaining_orphan_processes(orphans)
658
659
660    def _get_unassigned_entries(self, status):
661        for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
662                                                           % status):
663            if entry.status == status and not self.get_agents_for_entry(entry):
664                # The status can change during iteration, e.g., if job.run()
665                # sets a group of queue entries to Starting
666                yield entry
667
668
669    def _check_for_remaining_orphan_processes(self, orphans):
670        m = 'chromeos/autotest/errors/unrecovered_orphan_processes'
671        metrics.Gauge(m).set(len(orphans))
672
673        if not orphans:
674            return
675        subject = 'Unrecovered orphan autoserv processes remain'
676        message = '\n'.join(str(process) for process in orphans)
677        die_on_orphans = global_config.global_config.get_config_value(
678            scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
679
680        if die_on_orphans:
681            raise RuntimeError(subject + '\n' + message)
682
683
684    def _recover_pending_entries(self):
685        for entry in self._get_unassigned_entries(
686                models.HostQueueEntry.Status.PENDING):
687            logging.info('Recovering Pending entry %s', entry)
688            entry.on_pending()
689
690
691    def _check_for_unrecovered_verifying_entries(self):
692        queue_entries = scheduler_models.HostQueueEntry.fetch(
693                where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
694        unrecovered_hqes = []
695        for queue_entry in queue_entries:
696            special_tasks = models.SpecialTask.objects.filter(
697                    task__in=(models.SpecialTask.Task.CLEANUP,
698                              models.SpecialTask.Task.VERIFY),
699                    queue_entry__id=queue_entry.id,
700                    is_complete=False)
701            if special_tasks.count() == 0:
702                unrecovered_hqes.append(queue_entry)
703
704        if unrecovered_hqes:
705            message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
706            raise scheduler_lib.SchedulerError(
707                    '%d unrecovered verifying host queue entries:\n%s' %
708                    (len(unrecovered_hqes), message))
709
710
711    @_calls_log_tick_msg
712    def _schedule_special_tasks(self):
713        """
714        Execute queued SpecialTasks that are ready to run on idle hosts.
715
716        Special tasks include PreJobTasks like verify, reset and cleanup.
717        They are created through _schedule_new_jobs and associated with a hqe
718        This method translates SpecialTasks to the appropriate AgentTask and
719        adds them to the dispatchers agents list, so _handle_agents can execute
720        them.
721        """
722        # When the host scheduler is responsible for acquisition we only want
723        # to run tasks with leased hosts. All hqe tasks will already have
724        # leased hosts, and we don't want to run frontend tasks till the host
725        # scheduler has vetted the assignment. Note that this doesn't include
726        # frontend tasks with hosts leased by other active hqes.
727        for task in self._job_query_manager.get_prioritized_special_tasks(
728                only_tasks_with_leased_hosts=not self._inline_host_acquisition):
729            if self.host_has_agent(task.host):
730                continue
731            self.add_agent_task(self._get_agent_task_for_special_task(task))
732
733
734    def _reverify_remaining_hosts(self):
735        # recover active hosts that have not yet been recovered, although this
736        # should never happen
737        message = ('Recovering active host %s - this probably indicates a '
738                   'scheduler bug')
739        self._reverify_hosts_where(
740                "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
741                print_message=message)
742
743
744    def _reverify_hosts_where(self, where,
745                              print_message='Reverifying host %s'):
746        full_where='locked = 0 AND invalid = 0 AND ' + where
747        for host in scheduler_models.Host.fetch(where=full_where):
748            if self.host_has_agent(host):
749                # host has already been recovered in some way
750                continue
751            if self._host_has_scheduled_special_task(host):
752                # host will have a special task scheduled on the next tick
753                continue
754            if print_message:
755                logging.info(print_message, host.hostname)
756            models.SpecialTask.objects.create(
757                    task=models.SpecialTask.Task.CLEANUP,
758                    host=models.Host.objects.get(id=host.id))
759
760
761    def _recover_hosts(self):
762        # recover "Repair Failed" hosts
763        message = 'Reverifying dead host %s'
764        self._reverify_hosts_where("status = 'Repair Failed'",
765                                   print_message=message)
766
767
768    def _refresh_pending_queue_entries(self):
769        """
770        Lookup the pending HostQueueEntries and call our HostScheduler
771        refresh() method given that list.  Return the list.
772
773        @returns A list of pending HostQueueEntries sorted in priority order.
774        """
775        queue_entries = self._job_query_manager.get_pending_queue_entries(
776                only_hostless=not self._inline_host_acquisition)
777        if not queue_entries:
778            return []
779        return queue_entries
780
781
782    def _schedule_hostless_job(self, queue_entry):
783        """Schedule a hostless (suite) job.
784
785        @param queue_entry: The queue_entry representing the hostless job.
786        """
787        self.add_agent_task(HostlessQueueTask(queue_entry))
788
789        # Need to set execution_subdir before setting the status:
790        # After a restart of the scheduler, agents will be restored for HQEs in
791        # Starting, Running, Gathering, Parsing or Archiving. To do this, the
792        # execution_subdir is needed. Therefore it must be set before entering
793        # one of these states.
794        # Otherwise, if the scheduler was interrupted between setting the status
795        # and the execution_subdir, upon it's restart restoring agents would
796        # fail.
797        # Is there a way to get a status in one of these states without going
798        # through this code? Following cases are possible:
799        # - If it's aborted before being started:
800        #     active bit will be 0, so there's nothing to parse, it will just be
801        #     set to completed by _find_aborting. Critical statuses are skipped.
802        # - If it's aborted or it fails after being started:
803        #     It was started, so this code was executed.
804        queue_entry.update_field('execution_subdir', 'hostless')
805        queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
806
807
808    def _schedule_host_job(self, host, queue_entry):
809        """Schedules a job on the given host.
810
811        1. Assign the host to the hqe, if it isn't already assigned.
812        2. Create a SpecialAgentTask for the hqe.
813        3. Activate the hqe.
814
815        @param queue_entry: The job to schedule.
816        @param host: The host to schedule the job on.
817        """
818        if self.host_has_agent(host):
819            host_agent_task = list(self._host_agents.get(host.id))[0].task
820        else:
821            self._host_scheduler.schedule_host_job(host, queue_entry)
822
823
824    @_calls_log_tick_msg
825    def _schedule_new_jobs(self):
826        """
827        Find any new HQEs and call schedule_pre_job_tasks for it.
828
829        This involves setting the status of the HQE and creating a row in the
830        db corresponding the the special task, through
831        scheduler_models._queue_special_task. The new db row is then added as
832        an agent to the dispatcher through _schedule_special_tasks and
833        scheduled for execution on the drone through _handle_agents.
834        """
835        queue_entries = self._refresh_pending_queue_entries()
836
837        key = 'scheduler.jobs_per_tick'
838        new_hostless_jobs = 0
839        new_jobs_with_hosts = 0
840        new_jobs_need_hosts = 0
841        host_jobs = []
842        logging.debug('Processing %d queue_entries', len(queue_entries))
843
844        for queue_entry in queue_entries:
845            if queue_entry.is_hostless():
846                self._schedule_hostless_job(queue_entry)
847                new_hostless_jobs = new_hostless_jobs + 1
848            else:
849                host_jobs.append(queue_entry)
850                new_jobs_need_hosts = new_jobs_need_hosts + 1
851
852        metrics.Counter(
853            'chromeos/autotest/scheduler/scheduled_jobs_hostless'
854        ).increment_by(new_hostless_jobs)
855
856        if not host_jobs:
857            return
858
859        if not self._inline_host_acquisition:
860          # In this case, host_scheduler is responsible for scheduling
861          # host_jobs. Scheduling the jobs ourselves can lead to DB corruption
862          # since host_scheduler assumes it is the single process scheduling
863          # host jobs.
864          metrics.Gauge(
865              'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(
866                  len(host_jobs))
867          return
868
869        jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
870        for host_assignment in jobs_with_hosts:
871            self._schedule_host_job(host_assignment.host, host_assignment.job)
872            new_jobs_with_hosts = new_jobs_with_hosts + 1
873
874        metrics.Counter(
875            'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'
876        ).increment_by(new_jobs_with_hosts)
877        # TODO(pprabhu): Decide what to do about this metric. Million dollar
878        # question: What happens to jobs that were not matched. Do they stay in
879        # the queue, and get processed right here in the next tick (then we want
880        # a guage corresponding to the number of outstanding unmatched host
881        # jobs), or are they handled somewhere else (then we need a counter
882        # corresponding to failed_to_match_with_hosts jobs).
883        #autotest_stats.Gauge(key).send('new_jobs_without_hosts',
884        #                               new_jobs_need_hosts -
885        #                               new_jobs_with_hosts)
886
887
888    @_calls_log_tick_msg
889    def _schedule_running_host_queue_entries(self):
890        """
891        Adds agents to the dispatcher.
892
893        Any AgentTask, like the QueueTask, is wrapped in an Agent. The
894        QueueTask for example, will have a job with a control file, and
895        the agent will have methods that poll, abort and check if the queue
896        task is finished. The dispatcher runs the agent_task, as well as
897        other agents in it's _agents member, through _handle_agents, by
898        calling the Agents tick().
899
900        This method creates an agent for each HQE in one of (starting, running,
901        gathering, parsing, archiving) states, and adds it to the dispatcher so
902        it is handled by _handle_agents.
903        """
904        for agent_task in self._get_queue_entry_agent_tasks():
905            self.add_agent_task(agent_task)
906
907
908    @_calls_log_tick_msg
909    def _find_aborting(self):
910        """
911        Looks through the afe_host_queue_entries for an aborted entry.
912
913        The aborted bit is set on an HQE in many ways, the most common
914        being when a user requests an abort through the frontend, which
915        results in an rpc from the afe to abort_host_queue_entries.
916        """
917        jobs_to_stop = set()
918        for entry in scheduler_models.HostQueueEntry.fetch(
919                where='aborted=1 and complete=0'):
920
921            # If the job is running on a shard, let the shard handle aborting
922            # it and sync back the right status.
923            if entry.job.shard_id is not None and not server_utils.is_shard():
924                logging.info('Waiting for shard %s to abort hqe %s',
925                        entry.job.shard_id, entry)
926                continue
927
928            logging.info('Aborting %s', entry)
929
930            # The task would have started off with both is_complete and
931            # is_active = False. Aborted tasks are neither active nor complete.
932            # For all currently active tasks this will happen through the agent,
933            # but we need to manually update the special tasks that haven't
934            # started yet, because they don't have agents.
935            models.SpecialTask.objects.filter(is_active=False,
936                queue_entry_id=entry.id).update(is_complete=True)
937
938            for agent in self.get_agents_for_entry(entry):
939                agent.abort()
940            entry.abort(self)
941            jobs_to_stop.add(entry.job)
942        logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
943        for job in jobs_to_stop:
944            job.stop_if_necessary()
945
946
947    @_calls_log_tick_msg
948    def _find_aborted_special_tasks(self):
949        """
950        Find SpecialTasks that have been marked for abortion.
951
952        Poll the database looking for SpecialTasks that are active
953        and have been marked for abortion, then abort them.
954        """
955
956        # The completed and active bits are very important when it comes
957        # to scheduler correctness. The active bit is set through the prolog
958        # of a special task, and reset through the cleanup method of the
959        # SpecialAgentTask. The cleanup is called both through the abort and
960        # epilog. The complete bit is set in several places, and in general
961        # a hanging job will have is_active=1 is_complete=0, while a special
962        # task which completed will have is_active=0 is_complete=1. To check
963        # aborts we directly check active because the complete bit is set in
964        # several places, including the epilog of agent tasks.
965        aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
966                                                          is_aborted=True)
967        for task in aborted_tasks:
968            # There are 2 ways to get the agent associated with a task,
969            # through the host and through the hqe. A special task
970            # always needs a host, but doesn't always need a hqe.
971            for agent in self._host_agents.get(task.host.id, []):
972                if isinstance(agent.task, agent_task.SpecialAgentTask):
973
974                    # The epilog preforms critical actions such as
975                    # queueing the next SpecialTask, requeuing the
976                    # hqe etc, however it doesn't actually kill the
977                    # monitor process and set the 'done' bit. Epilogs
978                    # assume that the job failed, and that the monitor
979                    # process has already written an exit code. The
980                    # done bit is a necessary condition for
981                    # _handle_agents to schedule any more special
982                    # tasks against the host, and it must be set
983                    # in addition to is_active, is_complete and success.
984                    agent.task.epilog()
985                    agent.task.abort()
986
987
988    def _can_start_agent(self, agent, have_reached_limit):
989        # always allow zero-process agents to run
990        if agent.task.num_processes == 0:
991            return True
992        # don't allow any nonzero-process agents to run after we've reached a
993        # limit (this avoids starvation of many-process agents)
994        if have_reached_limit:
995            return False
996        # total process throttling
997        max_runnable_processes = _drone_manager.max_runnable_processes(
998                agent.task.owner_username,
999                agent.task.get_drone_hostnames_allowed())
1000        if agent.task.num_processes > max_runnable_processes:
1001            return False
1002        return True
1003
1004
1005    @_calls_log_tick_msg
1006    def _handle_agents(self):
1007        """
1008        Handles agents of the dispatcher.
1009
1010        Appropriate Agents are added to the dispatcher through
1011        _schedule_running_host_queue_entries. These agents each
1012        have a task. This method runs the agents task through
1013        agent.tick() leading to:
1014            agent.start
1015                prolog -> AgentTasks prolog
1016                          For each queue entry:
1017                            sets host status/status to Running
1018                            set started_on in afe_host_queue_entries
1019                run    -> AgentTasks run
1020                          Creates PidfileRunMonitor
1021                          Queues the autoserv command line for this AgentTask
1022                          via the drone manager. These commands are executed
1023                          through the drone managers execute actions.
1024                poll   -> AgentTasks/BaseAgentTask poll
1025                          checks the monitors exit_code.
1026                          Executes epilog if task is finished.
1027                          Executes AgentTasks _finish_task
1028                finish_task is usually responsible for setting the status
1029                of the HQE/host, and updating it's active and complete fileds.
1030
1031            agent.is_done
1032                Removed the agent from the dispatchers _agents queue.
1033                Is_done checks the finished bit on the agent, that is
1034                set based on the Agents task. During the agents poll
1035                we check to see if the monitor process has exited in
1036                it's finish method, and set the success member of the
1037                task based on this exit code.
1038        """
1039        num_started_this_tick = 0
1040        num_finished_this_tick = 0
1041        have_reached_limit = False
1042        # iterate over copy, so we can remove agents during iteration
1043        logging.debug('Handling %d Agents', len(self._agents))
1044        for agent in list(self._agents):
1045            self._log_extra_msg('Processing Agent with Host Ids: %s and '
1046                                'queue_entry ids:%s' % (agent.host_ids,
1047                                agent.queue_entry_ids))
1048            if not agent.started:
1049                if not self._can_start_agent(agent, have_reached_limit):
1050                    have_reached_limit = True
1051                    logging.debug('Reached Limit of allowed running Agents.')
1052                    continue
1053                num_started_this_tick += agent.task.num_processes
1054                self._log_extra_msg('Starting Agent')
1055            agent.tick()
1056            self._log_extra_msg('Agent tick completed.')
1057            if agent.is_done():
1058                num_finished_this_tick += agent.task.num_processes
1059                self._log_extra_msg("Agent finished")
1060                self.remove_agent(agent)
1061
1062        metrics.Counter(
1063            'chromeos/autotest/scheduler/agent_processes_started'
1064        ).increment_by(num_started_this_tick)
1065        metrics.Counter(
1066            'chromeos/autotest/scheduler/agent_processes_finished'
1067        ).increment_by(num_finished_this_tick)
1068        num_agent_processes = _drone_manager.total_running_processes()
1069        metrics.Gauge(
1070            'chromeos/autotest/scheduler/agent_processes'
1071        ).set(num_agent_processes)
1072        logging.info('%d running processes. %d added this tick.',
1073                     num_agent_processes, num_started_this_tick)
1074
1075
1076    def _log_tick_msg(self, msg):
1077        if self._tick_debug:
1078            logging.debug(msg)
1079
1080
1081    def _log_extra_msg(self, msg):
1082        if self._extra_debugging:
1083            logging.debug(msg)
1084
1085
1086SiteDispatcher = utils.import_site_class(
1087    __file__, 'autotest_lib.scheduler.site_monitor_db',
1088    'SiteDispatcher', BaseDispatcher)
1089
1090class Dispatcher(SiteDispatcher):
1091    pass
1092
1093
1094class Agent(object):
1095    """
1096    An agent for use by the Dispatcher class to perform a task.  An agent wraps
1097    around an AgentTask mainly to associate the AgentTask with the queue_entry
1098    and host ids.
1099
1100    The following methods are required on all task objects:
1101        poll() - Called periodically to let the task check its status and
1102                update its internal state.  If the task succeeded.
1103        is_done() - Returns True if the task is finished.
1104        abort() - Called when an abort has been requested.  The task must
1105                set its aborted attribute to True if it actually aborted.
1106
1107    The following attributes are required on all task objects:
1108        aborted - bool, True if this task was aborted.
1109        success - bool, True if this task succeeded.
1110        queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1111        host_ids - A sequence of Host ids this task represents.
1112    """
1113
1114
1115    def __init__(self, task):
1116        """
1117        @param task: An instance of an AgentTask.
1118        """
1119        self.task = task
1120
1121        # This is filled in by Dispatcher.add_agent()
1122        self.dispatcher = None
1123
1124        self.queue_entry_ids = task.queue_entry_ids
1125        self.host_ids = task.host_ids
1126
1127        self.started = False
1128        self.finished = False
1129
1130
1131    def tick(self):
1132        self.started = True
1133        if not self.finished:
1134            self.task.poll()
1135            if self.task.is_done():
1136                self.finished = True
1137
1138
1139    def is_done(self):
1140        return self.finished
1141
1142
1143    def abort(self):
1144        if self.task:
1145            self.task.abort()
1146            if self.task.aborted:
1147                # tasks can choose to ignore aborts
1148                self.finished = True
1149
1150
1151class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
1152    """
1153    Common functionality for QueueTask and HostlessQueueTask
1154    """
1155    def __init__(self, queue_entries):
1156        super(AbstractQueueTask, self).__init__()
1157        self.job = queue_entries[0].job
1158        self.queue_entries = queue_entries
1159
1160
1161    def _keyval_path(self):
1162        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1163
1164
1165    def _write_control_file(self, execution_path):
1166        control_path = _drone_manager.attach_file_to_execution(
1167                execution_path, self.job.control_file)
1168        return control_path
1169
1170
1171    # TODO: Refactor into autoserv_utils. crbug.com/243090
1172    def _command_line(self):
1173        execution_path = self.queue_entries[0].execution_path()
1174        control_path = self._write_control_file(execution_path)
1175        hostnames = ','.join(entry.host.hostname
1176                             for entry in self.queue_entries
1177                             if not entry.is_hostless())
1178
1179        execution_tag = self.queue_entries[0].execution_tag()
1180        params = _autoserv_command_line(
1181            hostnames,
1182            ['-P', execution_tag, '-n',
1183             _drone_manager.absolute_path(control_path)],
1184            job=self.job, verbose=False)
1185        if self.job.is_image_update_job():
1186            params += ['--image', self.job.update_image_path]
1187
1188        return params
1189
1190
1191    @property
1192    def num_processes(self):
1193        return len(self.queue_entries)
1194
1195
1196    @property
1197    def owner_username(self):
1198        return self.job.owner
1199
1200
1201    def _working_directory(self):
1202        return self._get_consistent_execution_path(self.queue_entries)
1203
1204
1205    def prolog(self):
1206        queued_key, queued_time = self._job_queued_keyval(self.job)
1207        keyval_dict = self.job.keyval_dict()
1208        keyval_dict[queued_key] = queued_time
1209        self._write_keyvals_before_job(keyval_dict)
1210        for queue_entry in self.queue_entries:
1211            queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
1212            queue_entry.set_started_on_now()
1213
1214
1215    def _write_lost_process_error_file(self):
1216        error_file_path = os.path.join(self._working_directory(), 'job_failure')
1217        _drone_manager.write_lines_to_file(error_file_path,
1218                                           [_LOST_PROCESS_ERROR])
1219
1220
1221    def _finish_task(self):
1222        if not self.monitor:
1223            return
1224
1225        self._write_job_finished()
1226
1227        if self.monitor.lost_process:
1228            self._write_lost_process_error_file()
1229
1230
1231    def _write_status_comment(self, comment):
1232        _drone_manager.write_lines_to_file(
1233            os.path.join(self._working_directory(), 'status.log'),
1234            ['INFO\t----\t----\t' + comment],
1235            paired_with_process=self.monitor.get_process())
1236
1237
1238    def _log_abort(self):
1239        if not self.monitor or not self.monitor.has_process():
1240            return
1241
1242        # build up sets of all the aborted_by and aborted_on values
1243        aborted_by, aborted_on = set(), set()
1244        for queue_entry in self.queue_entries:
1245            if queue_entry.aborted_by:
1246                aborted_by.add(queue_entry.aborted_by)
1247                t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1248                aborted_on.add(t)
1249
1250        # extract some actual, unique aborted by value and write it out
1251        # TODO(showard): this conditional is now obsolete, we just need to leave
1252        # it in temporarily for backwards compatibility over upgrades.  delete
1253        # soon.
1254        assert len(aborted_by) <= 1
1255        if len(aborted_by) == 1:
1256            aborted_by_value = aborted_by.pop()
1257            aborted_on_value = max(aborted_on)
1258        else:
1259            aborted_by_value = 'autotest_system'
1260            aborted_on_value = int(time.time())
1261
1262        self._write_keyval_after_job("aborted_by", aborted_by_value)
1263        self._write_keyval_after_job("aborted_on", aborted_on_value)
1264
1265        aborted_on_string = str(datetime.datetime.fromtimestamp(
1266            aborted_on_value))
1267        self._write_status_comment('Job aborted by %s on %s' %
1268                                   (aborted_by_value, aborted_on_string))
1269
1270
1271    def abort(self):
1272        super(AbstractQueueTask, self).abort()
1273        self._log_abort()
1274        self._finish_task()
1275
1276
1277    def epilog(self):
1278        super(AbstractQueueTask, self).epilog()
1279        self._finish_task()
1280
1281
1282class QueueTask(AbstractQueueTask):
1283    def __init__(self, queue_entries):
1284        super(QueueTask, self).__init__(queue_entries)
1285        self._set_ids(queue_entries=queue_entries)
1286        self._enable_ssp_container = (
1287                global_config.global_config.get_config_value(
1288                        'AUTOSERV', 'enable_ssp_container', type=bool,
1289                        default=True))
1290
1291
1292    def prolog(self):
1293        self._check_queue_entry_statuses(
1294                self.queue_entries,
1295                allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1296                                      models.HostQueueEntry.Status.RUNNING),
1297                allowed_host_statuses=(models.Host.Status.PENDING,
1298                                       models.Host.Status.RUNNING))
1299
1300        super(QueueTask, self).prolog()
1301
1302        for queue_entry in self.queue_entries:
1303            self._write_host_keyvals(queue_entry.host)
1304            queue_entry.host.set_status(models.Host.Status.RUNNING)
1305            queue_entry.host.update_field('dirty', 1)
1306
1307
1308    def _finish_task(self):
1309        super(QueueTask, self)._finish_task()
1310
1311        for queue_entry in self.queue_entries:
1312            queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
1313            queue_entry.host.set_status(models.Host.Status.RUNNING)
1314
1315
1316    def _command_line(self):
1317        invocation = super(QueueTask, self)._command_line()
1318        # Check if server-side packaging is needed.
1319        if (self._enable_ssp_container and
1320            self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1321            self.job.require_ssp != False):
1322            invocation += ['--require-ssp']
1323            keyval_dict = self.job.keyval_dict()
1324            test_source_build = keyval_dict.get('test_source_build', None)
1325            if test_source_build:
1326                invocation += ['--test_source_build', test_source_build]
1327        if self.job.parent_job_id:
1328            invocation += ['--parent_job_id', str(self.job.parent_job_id)]
1329        return invocation + ['--verify_job_repo_url']
1330
1331
1332class HostlessQueueTask(AbstractQueueTask):
1333    def __init__(self, queue_entry):
1334        super(HostlessQueueTask, self).__init__([queue_entry])
1335        self.queue_entry_ids = [queue_entry.id]
1336
1337
1338    def prolog(self):
1339        super(HostlessQueueTask, self).prolog()
1340
1341
1342    def _finish_task(self):
1343        super(HostlessQueueTask, self)._finish_task()
1344
1345        # When a job is added to database, its initial status is always
1346        # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1347        # status, check if any of them can be started. If scheduler hits some
1348        # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1349        # leave these jobs in Starting status. Otherwise, the jobs'
1350        # status will be changed to Running, and an autoserv process
1351        # will be started in drone for each of these jobs.
1352        # If the entry is still in status Starting, the process has not started
1353        # yet. Therefore, there is no need to parse and collect log. Without
1354        # this check, exception will be raised by scheduler as execution_subdir
1355        # for this queue entry does not have a value yet.
1356        hqe = self.queue_entries[0]
1357        if hqe.status != models.HostQueueEntry.Status.STARTING:
1358            hqe.set_status(models.HostQueueEntry.Status.PARSING)
1359
1360
1361if __name__ == '__main__':
1362    main()
1363