1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11    1. Hosts will not be assigned to multiple hqes simultaneously. The process
12       of assignment in this case refers to the modification of the host_id
13       column of a row in the afe_host_queue_entries table, to reflect the host
14       id of a leased host matching the dependencies of the job.
15    2. Hosts that are not being used by active hqes or incomplete special tasks
16       will be released back to the available hosts pool, for acquisition by
17       subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
23
24** Suport minimum duts requirement for suites (non-inline mode) **
25
26Each suite can specify the minimum number of duts it requires by
27dropping a 'suite_min_duts' job keyval which defaults to 0.
28
29When suites are competing for duts, if any suite has not got minimum duts
30it requires, the host scheduler will try to meet the requirement first,
31even if other suite may have higher priority or earlier timestamp. Once
32all suites' minimum duts requirement have been fullfilled, the host
33scheduler will allocate the rest of duts based on job priority and suite job id.
34This is to prevent low priority suites from starving when sharing pool with
35high-priority suites.
36
37Note:
38    1. Prevent potential starvation:
39       We need to carefully choose |suite_min_duts| for both low and high
40       priority suites. If a high priority suite didn't specify it but a low
41       priority one does, the high priority suite can be starved!
42    2. Restart requirement:
43       Restart host scheduler if you manually released a host by setting
44       leased=0 in db. This is needed because host scheduler maintains internal
45       state of host assignment for suites.
46    3. Exchanging duts triggers provisioning:
47       TODO(fdeng): There is a chance two suites can exchange duts,
48       if the two suites are for different builds, the exchange
49       will trigger provisioning. This can be optimized by preferring getting
50       hosts with the same build.
51"""
52
53import argparse
54import collections
55import datetime
56import logging
57import os
58import signal
59import sys
60import time
61
62import common
63from autotest_lib.client.common_lib import utils
64from autotest_lib.frontend import setup_django_environment
65
66# This import needs to come earlier to avoid using autotest's version of
67# httplib2, which is out of date.
68try:
69    from chromite.lib import metrics
70    from chromite.lib import ts_mon_config
71except ImportError:
72    metrics = utils.metrics_mock
73    ts_mon_config = utils.metrics_mock
74
75from autotest_lib.client.common_lib import global_config
76from autotest_lib.scheduler import email_manager
77from autotest_lib.scheduler import query_managers
78from autotest_lib.scheduler import rdb_lib
79from autotest_lib.scheduler import rdb_utils
80from autotest_lib.scheduler import scheduler_lib
81from autotest_lib.scheduler import scheduler_models
82from autotest_lib.site_utils import job_overhead
83from autotest_lib.site_utils import server_manager_utils
84
85
86_db_manager = None
87_shutdown = False
88_tick_pause_sec = global_config.global_config.get_config_value(
89        'SCHEDULER', 'tick_pause_sec', type=int, default=5)
90_monitor_db_host_acquisition = global_config.global_config.get_config_value(
91        'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
92_METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
93
94class SuiteRecorder(object):
95    """Recording the host assignment for suites.
96
97    The recorder holds two things:
98        * suite_host_num, records how many duts a suite is holding,
99          which is a map <suite_job_id -> num_of_hosts>
100        * hosts_to_suites, records which host is assigned to which
101          suite, it is a map <host_id -> suite_job_id>
102    The two datastructure got updated when a host is assigned to or released
103    by a job.
104
105    The reason to maintain hosts_to_suites is that, when a host is released,
106    we need to know which suite it was leased to. Querying the db for the
107    latest completed job that has run on a host is slow.  Therefore, we go with
108    an alternative: keeping a <host id, suite job id> map
109    in memory (for 10K hosts, the map should take less than 1M memory on
110    64-bit machine with python 2.7)
111
112    """
113
114
115    def __init__(self, job_query_manager):
116        """Initialize.
117
118        @param job_queue_manager: A JobQueueryManager object.
119        """
120        self.job_query_manager = job_query_manager
121        self.suite_host_num, self.hosts_to_suites = (
122                self.job_query_manager.get_suite_host_assignment())
123
124
125    def record_assignment(self, queue_entry):
126        """Record that the hqe has got a host.
127
128        @param queue_entry: A scheduler_models.HostQueueEntry object which has
129                            got a host.
130        """
131        parent_id = queue_entry.job.parent_job_id
132        if not parent_id:
133            return
134        if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
135            logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
136                          'seems already recorded', queue_entry.id,
137                          parent_id, queue_entry.host.hostname)
138            return
139        num_hosts = self.suite_host_num.get(parent_id, 0)
140        self.suite_host_num[parent_id] = num_hosts + 1
141        self.hosts_to_suites[queue_entry.host_id] = parent_id
142        logging.debug('Suite %d got host %s, currently holding %d hosts',
143                      parent_id, queue_entry.host.hostname,
144                      self.suite_host_num[parent_id])
145
146
147    def record_release(self, hosts):
148        """Update the record with host releasing event.
149
150        @param hosts: A list of scheduler_models.Host objects.
151        """
152        for host in hosts:
153            if host.id in self.hosts_to_suites:
154                parent_job_id = self.hosts_to_suites.pop(host.id)
155                count = self.suite_host_num[parent_job_id] - 1
156                if count == 0:
157                    del self.suite_host_num[parent_job_id]
158                else:
159                    self.suite_host_num[parent_job_id] = count
160                logging.debug(
161                        'Suite %d releases host %s, currently holding %d hosts',
162                        parent_job_id, host.hostname, count)
163
164
165    def get_min_duts(self, suite_job_ids):
166        """Figure out min duts to request.
167
168        Given a set ids of suite jobs, figure out minimum duts to request for
169        each suite. It is determined by two factors: min_duts specified
170        for each suite in its job keyvals, and how many duts a suite is
171        currently holding.
172
173        @param suite_job_ids: A set of suite job ids.
174
175        @returns: A dictionary, the key is suite_job_id, the value
176                  is the minimum number of duts to request.
177        """
178        suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
179                suite_job_ids)
180        for parent_id in suite_job_ids:
181            min_duts = suite_min_duts.get(parent_id, 0)
182            cur_duts = self.suite_host_num.get(parent_id, 0)
183            suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
184        logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
185                      suite_min_duts)
186        return suite_min_duts
187
188
189class BaseHostScheduler(object):
190    """Base class containing host acquisition logic.
191
192    This class contains all the core host acquisition logic needed by the
193    scheduler to run jobs on hosts. It is only capable of releasing hosts
194    back to the rdb through its tick, any other action must be instigated by
195    the job scheduler.
196    """
197
198
199    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
200
201
202    def __init__(self):
203        self.host_query_manager = query_managers.AFEHostQueryManager()
204
205
206    def _release_hosts(self):
207        """Release hosts to the RDB.
208
209        Release all hosts that are ready and are currently not being used by an
210        active hqe, and don't have a new special task scheduled against them.
211
212        @return a list of hosts that are released.
213        """
214        release_hosts = self.host_query_manager.find_unused_healty_hosts()
215        release_hostnames = [host.hostname for host in release_hosts]
216        if release_hostnames:
217            self.host_query_manager.set_leased(
218                    False, hostname__in=release_hostnames)
219        return release_hosts
220
221
222    @classmethod
223    def schedule_host_job(cls, host, queue_entry):
224        """Schedule a job on a host.
225
226        Scheduling a job involves:
227            1. Setting the active bit on the queue_entry.
228            2. Scheduling a special task on behalf of the queue_entry.
229        Performing these actions will lead the job scheduler through a chain of
230        events, culminating in running the test and collecting results from
231        the host.
232
233        @param host: The host against which to schedule the job.
234        @param queue_entry: The queue_entry to schedule.
235        """
236        if queue_entry.host_id is None:
237            queue_entry.set_host(host)
238        elif host.id != queue_entry.host_id:
239                raise rdb_utils.RDBException('The rdb returned host: %s '
240                        'but the job:%s was already assigned a host: %s. ' %
241                        (host.hostname, queue_entry.job_id,
242                         queue_entry.host.hostname))
243        queue_entry.update_field('active', True)
244
245        # TODO: crbug.com/373936. The host scheduler should only be assigning
246        # jobs to hosts, but the criterion we use to release hosts depends
247        # on it not being used by an active hqe. Since we're activating the
248        # hqe here, we also need to schedule its first prejob task. OTOH,
249        # we could converge to having the host scheduler manager all special
250        # tasks, since their only use today is to verify/cleanup/reset a host.
251        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
252        queue_entry.schedule_pre_job_tasks()
253
254
255    def acquire_hosts(self, host_jobs):
256        """Accquire hosts for given jobs.
257
258        This method sends jobs that need hosts to rdb.
259        Child class can override this method to pipe more args
260        to rdb.
261
262        @param host_jobs: A list of queue entries that either require hosts,
263            or require host assignment validation through the rdb.
264
265        @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
266                       for each host acquired on behalf of a queue_entry,
267                       or None if a host wasn't found.
268        """
269        return rdb_lib.acquire_hosts(host_jobs)
270
271
272    def find_hosts_for_jobs(self, host_jobs):
273        """Find and verify hosts for a list of jobs.
274
275        @param host_jobs: A list of queue entries that either require hosts,
276            or require host assignment validation through the rdb.
277        @return: A generator of tuples of the form (host, queue_entry) for each
278            valid host-queue_entry assignment.
279        """
280        hosts = self.acquire_hosts(host_jobs)
281        for host, job in zip(hosts, host_jobs):
282            if host:
283                yield self.host_assignment(host, job)
284
285
286    def tick(self):
287        """Schedule core host management activities."""
288        self._release_hosts()
289
290
291class HostScheduler(BaseHostScheduler):
292    """A scheduler capable managing host acquisition for new jobs."""
293
294
295    def __init__(self):
296        super(HostScheduler, self).__init__()
297        self.job_query_manager = query_managers.AFEJobQueryManager()
298        # Keeping track on how many hosts each suite is holding
299        # {suite_job_id: num_hosts}
300        self._suite_recorder = SuiteRecorder(self.job_query_manager)
301
302
303    def _record_host_assignment(self, host, queue_entry):
304        """Record that |host| is assigned to |queue_entry|.
305
306        Record:
307            1. How long it takes to assign a host to a job in metadata db.
308            2. Record host assignment of a suite.
309
310        @param host: A Host object.
311        @param queue_entry: A HostQueueEntry object.
312        """
313        secs_in_queued = (datetime.datetime.now() -
314                          queue_entry.job.created_on).total_seconds()
315        job_overhead.record_state_duration(
316                queue_entry.job_id, host.hostname,
317                job_overhead.STATUS.QUEUED, secs_in_queued)
318        self._suite_recorder.record_assignment(queue_entry)
319
320
321    @metrics.SecondsTimerDecorator(
322            '%s/schedule_jobs_duration' % _METRICS_PREFIX)
323    def _schedule_jobs(self):
324        """Schedule new jobs against hosts."""
325
326        new_jobs_with_hosts = 0
327        queue_entries = self.job_query_manager.get_pending_queue_entries(
328                only_hostless=False)
329        unverified_host_jobs = [job for job in queue_entries
330                                if not job.is_hostless()]
331        if unverified_host_jobs:
332            for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
333                self.schedule_host_job(acquisition.host, acquisition.job)
334                self._record_host_assignment(acquisition.host, acquisition.job)
335                new_jobs_with_hosts += 1
336            metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
337                            ).increment_by(new_jobs_with_hosts)
338
339        num_jobs_without_hosts = (len(unverified_host_jobs) -
340                                  new_jobs_with_hosts)
341        metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
342                      ).set(num_jobs_without_hosts)
343
344        metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
345
346    @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
347    def _lease_hosts_of_frontend_tasks(self):
348        """Lease hosts of tasks scheduled through the frontend."""
349        # We really don't need to get all the special tasks here, just the ones
350        # without hqes, but reusing the method used by the scheduler ensures
351        # we prioritize the same way.
352        lease_hostnames = [
353                task.host.hostname for task in
354                self.job_query_manager.get_prioritized_special_tasks(
355                    only_tasks_with_leased_hosts=False)
356                if task.queue_entry_id is None and not task.host.leased]
357        # Leasing a leased hosts here shouldn't be a problem:
358        # 1. The only way a host can be leased is if it's been assigned to
359        #    an active hqe or another similar frontend task, but doing so will
360        #    have already precluded it from the list of tasks returned by the
361        #    job_query_manager.
362        # 2. The unleasing is done based on global conditions. Eg: Even if a
363        #    task has already leased a host and we lease it again, the
364        #    host scheduler won't release the host till both tasks are complete.
365        if lease_hostnames:
366            self.host_query_manager.set_leased(
367                    True, hostname__in=lease_hostnames)
368
369
370    def acquire_hosts(self, host_jobs):
371        """Override acquire_hosts.
372
373        This method overrides the method in parent class.
374        It figures out a set of suites that |host_jobs| belong to;
375        and get min_duts requirement for each suite.
376        It pipes min_duts for each suite to rdb.
377
378        """
379        parent_job_ids = set([q.job.parent_job_id
380                              for q in host_jobs if q.job.parent_job_id])
381        suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
382        return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
383
384
385    @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
386    def tick(self):
387        logging.info('Calling new tick.')
388        logging.info('Leasing hosts for frontend tasks.')
389        self._lease_hosts_of_frontend_tasks()
390        logging.info('Finding hosts for new jobs.')
391        self._schedule_jobs()
392        logging.info('Releasing unused hosts.')
393        released_hosts = self._release_hosts()
394        logging.info('Updating suite assignment with released hosts')
395        self._suite_recorder.record_release(released_hosts)
396        logging.info('Calling email_manager.')
397        email_manager.manager.send_queued_emails()
398
399
400class DummyHostScheduler(BaseHostScheduler):
401    """A dummy host scheduler that doesn't acquire or release hosts."""
402
403    def __init__(self):
404        pass
405
406
407    def tick(self):
408        pass
409
410
411def handle_signal(signum, frame):
412    """Sigint handler so we don't crash mid-tick."""
413    global _shutdown
414    _shutdown = True
415    logging.info("Shutdown request received.")
416
417
418def initialize(testing=False):
419    """Initialize the host scheduler."""
420    if testing:
421        # Don't import testing utilities unless we're in testing mode,
422        # as the database imports have side effects.
423        from autotest_lib.scheduler import rdb_testing_utils
424        rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
425                db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
426    global _db_manager
427    _db_manager = scheduler_lib.ConnectionManager()
428    scheduler_lib.setup_logging(
429            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
430            None, timestamped_logfile_prefix='host_scheduler')
431    logging.info("Setting signal handler")
432    signal.signal(signal.SIGINT, handle_signal)
433    signal.signal(signal.SIGTERM, handle_signal)
434    scheduler_models.initialize()
435
436
437def parse_arguments(argv):
438    """
439    Parse command line arguments
440
441    @param argv: argument list to parse
442    @returns:    parsed arguments.
443    """
444    parser = argparse.ArgumentParser(description='Host scheduler.')
445    parser.add_argument('--testing', action='store_true', default=False,
446                        help='Start the host scheduler in testing mode.')
447    parser.add_argument('--production',
448                        help=('Indicate that scheduler is running in production'
449                              ' environment and it can use database that is not'
450                              ' hosted in localhost. If it is set to False, '
451                              'scheduler will fail if database is not in '
452                              'localhost.'),
453                        action='store_true', default=False)
454    parser.add_argument(
455            '--lifetime-hours',
456            type=float,
457            default=None,
458            help='If provided, number of hours the scheduler should run for. '
459                 'At the expiry of this time, the process will exit '
460                 'gracefully.',
461    )
462    parser.add_argument(
463            '--metrics-file',
464            help='If provided, drop metrics to this local file instead of '
465                 'reporting to ts_mon',
466            type=str,
467            default=None,
468    )
469    options = parser.parse_args(argv)
470
471    return options
472
473
474def main():
475    if _monitor_db_host_acquisition:
476        logging.info('Please set inline_host_acquisition=False in the shadow '
477                     'config before starting the host scheduler.')
478        sys.exit(0)
479    try:
480        options = parse_arguments(sys.argv[1:])
481        scheduler_lib.check_production_settings(options)
482
483        # If server database is enabled, check if the server has role
484        # `host_scheduler`. If the server does not have host_scheduler role,
485        # exception will be raised and host scheduler will not continue to run.
486        if server_manager_utils.use_server_db():
487            server_manager_utils.confirm_server_has_role(hostname='localhost',
488                                                         role='host_scheduler')
489
490        initialize(options.testing)
491
492        with ts_mon_config.SetupTsMonGlobalState(
493                'autotest_host_scheduler',
494                indirect=True,
495                debug_file=options.metrics_file,
496        ):
497            metrics.Counter('%s/start' % _METRICS_PREFIX).increment()
498            process_start_time = time.time()
499            host_scheduler = HostScheduler()
500            minimum_tick_sec = global_config.global_config.get_config_value(
501                    'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
502            while not _shutdown:
503                if _lifetime_expired(options.lifetime_hours,
504                                     process_start_time):
505                    break
506                start = time.time()
507                host_scheduler.tick()
508                curr_tick_sec = time.time() - start
509                if (minimum_tick_sec > curr_tick_sec):
510                    time.sleep(minimum_tick_sec - curr_tick_sec)
511                else:
512                    time.sleep(0.0001)
513            logging.info('Shutdown request recieved. Bye! Bye!')
514    except server_manager_utils.ServerActionError:
515        # This error is expected when the server is not in primary status
516        # for host-scheduler role. Thus do not send email for it.
517        raise
518    except Exception:
519        metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
520        raise
521    finally:
522        email_manager.manager.send_queued_emails()
523        if _db_manager:
524            _db_manager.disconnect()
525
526
527def _lifetime_expired(lifetime_hours, process_start_time):
528    """Returns True if we've expired the process lifetime, False otherwise.
529
530    Also sets the global _shutdown so that any background processes also take
531    the cue to exit.
532    """
533    if lifetime_hours is None:
534        return False
535    if time.time() - process_start_time > lifetime_hours * 3600:
536        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
537                     lifetime_hours)
538        global _shutdown
539        _shutdown = True
540        return True
541    return False
542
543
544if __name__ == '__main__':
545    main()
546