1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11    1. Hosts will not be assigned to multiple hqes simultaneously. The process
12       of assignment in this case refers to the modification of the host_id
13       column of a row in the afe_host_queue_entries table, to reflect the host
14       id of a leased host matching the dependencies of the job.
15    2. Hosts that are not being used by active hqes or incomplete special tasks
16       will be released back to the available hosts pool, for acquisition by
17       subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
23
24** Suport minimum duts requirement for suites (non-inline mode) **
25
26Each suite can specify the minimum number of duts it requires by
27dropping a 'suite_min_duts' job keyval which defaults to 0.
28
29When suites are competing for duts, if any suite has not got minimum duts
30it requires, the host scheduler will try to meet the requirement first,
31even if other suite may have higher priority or earlier timestamp. Once
32all suites' minimum duts requirement have been fullfilled, the host
33scheduler will allocate the rest of duts based on job priority and suite job id.
34This is to prevent low priority suites from starving when sharing pool with
35high-priority suites.
36
37Note:
38    1. Prevent potential starvation:
39       We need to carefully choose |suite_min_duts| for both low and high
40       priority suites. If a high priority suite didn't specify it but a low
41       priority one does, the high priority suite can be starved!
42    2. Restart requirement:
43       Restart host scheduler if you manually released a host by setting
44       leased=0 in db. This is needed because host scheduler maintains internal
45       state of host assignment for suites.
46    3. Exchanging duts triggers provisioning:
47       TODO(fdeng): There is a chance two suites can exchange duts,
48       if the two suites are for different builds, the exchange
49       will trigger provisioning. This can be optimized by preferring getting
50       hosts with the same build.
51"""
52
53import argparse
54import collections
55import datetime
56import logging
57import os
58import signal
59import sys
60import time
61
62import common
63from autotest_lib.frontend import setup_django_environment
64
65from autotest_lib.client.common_lib import global_config
66from autotest_lib.client.common_lib import utils
67from autotest_lib.scheduler import email_manager
68from autotest_lib.scheduler import query_managers
69from autotest_lib.scheduler import rdb_lib
70from autotest_lib.scheduler import rdb_utils
71from autotest_lib.scheduler import scheduler_lib
72from autotest_lib.scheduler import scheduler_models
73from autotest_lib.site_utils import job_overhead
74from autotest_lib.site_utils import metadata_reporter
75from autotest_lib.site_utils import server_manager_utils
76
77try:
78    from chromite.lib import metrics
79    from chromite.lib import ts_mon_config
80except ImportError:
81    metrics = utils.metrics_mock
82    ts_mon_config = utils.metrics_mock
83
84
85_db_manager = None
86_shutdown = False
87_tick_pause_sec = global_config.global_config.get_config_value(
88        'SCHEDULER', 'tick_pause_sec', type=int, default=5)
89_monitor_db_host_acquisition = global_config.global_config.get_config_value(
90        'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
91_METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
92
93class SuiteRecorder(object):
94    """Recording the host assignment for suites.
95
96    The recorder holds two things:
97        * suite_host_num, records how many duts a suite is holding,
98          which is a map <suite_job_id -> num_of_hosts>
99        * hosts_to_suites, records which host is assigned to which
100          suite, it is a map <host_id -> suite_job_id>
101    The two datastructure got updated when a host is assigned to or released
102    by a job.
103
104    The reason to maintain hosts_to_suites is that, when a host is released,
105    we need to know which suite it was leased to. Querying the db for the
106    latest completed job that has run on a host is slow.  Therefore, we go with
107    an alternative: keeping a <host id, suite job id> map
108    in memory (for 10K hosts, the map should take less than 1M memory on
109    64-bit machine with python 2.7)
110
111    """
112
113
114    def __init__(self, job_query_manager):
115        """Initialize.
116
117        @param job_queue_manager: A JobQueueryManager object.
118        """
119        self.job_query_manager = job_query_manager
120        self.suite_host_num, self.hosts_to_suites = (
121                self.job_query_manager.get_suite_host_assignment())
122
123
124    def record_assignment(self, queue_entry):
125        """Record that the hqe has got a host.
126
127        @param queue_entry: A scheduler_models.HostQueueEntry object which has
128                            got a host.
129        """
130        parent_id = queue_entry.job.parent_job_id
131        if not parent_id:
132            return
133        if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
134            logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
135                          'seems already recorded', queue_entry.id,
136                          parent_id, queue_entry.host.hostname)
137            return
138        num_hosts = self.suite_host_num.get(parent_id, 0)
139        self.suite_host_num[parent_id] = num_hosts + 1
140        self.hosts_to_suites[queue_entry.host_id] = parent_id
141        logging.debug('Suite %d got host %s, currently holding %d hosts',
142                      parent_id, queue_entry.host.hostname,
143                      self.suite_host_num[parent_id])
144
145
146    def record_release(self, hosts):
147        """Update the record with host releasing event.
148
149        @param hosts: A list of scheduler_models.Host objects.
150        """
151        for host in hosts:
152            if host.id in self.hosts_to_suites:
153                parent_job_id = self.hosts_to_suites.pop(host.id)
154                count = self.suite_host_num[parent_job_id] - 1
155                if count == 0:
156                    del self.suite_host_num[parent_job_id]
157                else:
158                    self.suite_host_num[parent_job_id] = count
159                logging.debug(
160                        'Suite %d releases host %s, currently holding %d hosts',
161                        parent_job_id, host.hostname, count)
162
163
164    def get_min_duts(self, suite_job_ids):
165        """Figure out min duts to request.
166
167        Given a set ids of suite jobs, figure out minimum duts to request for
168        each suite. It is determined by two factors: min_duts specified
169        for each suite in its job keyvals, and how many duts a suite is
170        currently holding.
171
172        @param suite_job_ids: A set of suite job ids.
173
174        @returns: A dictionary, the key is suite_job_id, the value
175                  is the minimum number of duts to request.
176        """
177        suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
178                suite_job_ids)
179        for parent_id in suite_job_ids:
180            min_duts = suite_min_duts.get(parent_id, 0)
181            cur_duts = self.suite_host_num.get(parent_id, 0)
182            suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
183        logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
184                      suite_min_duts)
185        return suite_min_duts
186
187
188class BaseHostScheduler(object):
189    """Base class containing host acquisition logic.
190
191    This class contains all the core host acquisition logic needed by the
192    scheduler to run jobs on hosts. It is only capable of releasing hosts
193    back to the rdb through its tick, any other action must be instigated by
194    the job scheduler.
195    """
196
197
198    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
199
200
201    def __init__(self):
202        self.host_query_manager = query_managers.AFEHostQueryManager()
203
204
205    def _release_hosts(self):
206        """Release hosts to the RDB.
207
208        Release all hosts that are ready and are currently not being used by an
209        active hqe, and don't have a new special task scheduled against them.
210
211        @return a list of hosts that are released.
212        """
213        release_hosts = self.host_query_manager.find_unused_healty_hosts()
214        release_hostnames = [host.hostname for host in release_hosts]
215        if release_hostnames:
216            self.host_query_manager.set_leased(
217                    False, hostname__in=release_hostnames)
218        return release_hosts
219
220
221    @classmethod
222    def schedule_host_job(cls, host, queue_entry):
223        """Schedule a job on a host.
224
225        Scheduling a job involves:
226            1. Setting the active bit on the queue_entry.
227            2. Scheduling a special task on behalf of the queue_entry.
228        Performing these actions will lead the job scheduler through a chain of
229        events, culminating in running the test and collecting results from
230        the host.
231
232        @param host: The host against which to schedule the job.
233        @param queue_entry: The queue_entry to schedule.
234        """
235        if queue_entry.host_id is None:
236            queue_entry.set_host(host)
237        elif host.id != queue_entry.host_id:
238                raise rdb_utils.RDBException('The rdb returned host: %s '
239                        'but the job:%s was already assigned a host: %s. ' %
240                        (host.hostname, queue_entry.job_id,
241                         queue_entry.host.hostname))
242        queue_entry.update_field('active', True)
243
244        # TODO: crbug.com/373936. The host scheduler should only be assigning
245        # jobs to hosts, but the criterion we use to release hosts depends
246        # on it not being used by an active hqe. Since we're activating the
247        # hqe here, we also need to schedule its first prejob task. OTOH,
248        # we could converge to having the host scheduler manager all special
249        # tasks, since their only use today is to verify/cleanup/reset a host.
250        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
251        queue_entry.schedule_pre_job_tasks()
252
253
254    def acquire_hosts(self, host_jobs):
255        """Accquire hosts for given jobs.
256
257        This method sends jobs that need hosts to rdb.
258        Child class can override this method to pipe more args
259        to rdb.
260
261        @param host_jobs: A list of queue entries that either require hosts,
262            or require host assignment validation through the rdb.
263
264        @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
265                       for each host acquired on behalf of a queue_entry,
266                       or None if a host wasn't found.
267        """
268        return rdb_lib.acquire_hosts(host_jobs)
269
270
271    def find_hosts_for_jobs(self, host_jobs):
272        """Find and verify hosts for a list of jobs.
273
274        @param host_jobs: A list of queue entries that either require hosts,
275            or require host assignment validation through the rdb.
276        @return: A list of tuples of the form (host, queue_entry) for each
277            valid host-queue_entry assignment.
278        """
279        jobs_with_hosts = []
280        hosts = self.acquire_hosts(host_jobs)
281        for host, job in zip(hosts, host_jobs):
282            if host:
283                jobs_with_hosts.append(self.host_assignment(host, job))
284        return jobs_with_hosts
285
286
287    def tick(self):
288        """Schedule core host management activities."""
289        self._release_hosts()
290
291
292class HostScheduler(BaseHostScheduler):
293    """A scheduler capable managing host acquisition for new jobs."""
294
295
296    def __init__(self):
297        super(HostScheduler, self).__init__()
298        self.job_query_manager = query_managers.AFEJobQueryManager()
299        # Keeping track on how many hosts each suite is holding
300        # {suite_job_id: num_hosts}
301        self._suite_recorder = SuiteRecorder(self.job_query_manager)
302
303
304    def _record_host_assignment(self, host, queue_entry):
305        """Record that |host| is assigned to |queue_entry|.
306
307        Record:
308            1. How long it takes to assign a host to a job in metadata db.
309            2. Record host assignment of a suite.
310
311        @param host: A Host object.
312        @param queue_entry: A HostQueueEntry object.
313        """
314        secs_in_queued = (datetime.datetime.now() -
315                          queue_entry.job.created_on).total_seconds()
316        job_overhead.record_state_duration(
317                queue_entry.job_id, host.hostname,
318                job_overhead.STATUS.QUEUED, secs_in_queued)
319        self._suite_recorder.record_assignment(queue_entry)
320
321
322    @metrics.SecondsTimerDecorator(
323            '%s/schedule_jobs_duration' % _METRICS_PREFIX)
324    def _schedule_jobs(self):
325        """Schedule new jobs against hosts."""
326
327        new_jobs_with_hosts = 0
328        queue_entries = self.job_query_manager.get_pending_queue_entries(
329                only_hostless=False)
330        unverified_host_jobs = [job for job in queue_entries
331                                if not job.is_hostless()]
332        if not unverified_host_jobs:
333            return
334        for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
335            self.schedule_host_job(acquisition.host, acquisition.job)
336            self._record_host_assignment(acquisition.host, acquisition.job)
337            new_jobs_with_hosts += 1
338        metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
339                        ).increment_by(new_jobs_with_hosts)
340
341        num_jobs_without_hosts = len(unverified_host_jobs) - new_jobs_with_hosts
342        metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
343                      ).set(num_jobs_without_hosts)
344        metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
345
346
347    @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
348    def _lease_hosts_of_frontend_tasks(self):
349        """Lease hosts of tasks scheduled through the frontend."""
350        # We really don't need to get all the special tasks here, just the ones
351        # without hqes, but reusing the method used by the scheduler ensures
352        # we prioritize the same way.
353        lease_hostnames = [
354                task.host.hostname for task in
355                self.job_query_manager.get_prioritized_special_tasks(
356                    only_tasks_with_leased_hosts=False)
357                if task.queue_entry_id is None and not task.host.leased]
358        # Leasing a leased hosts here shouldn't be a problem:
359        # 1. The only way a host can be leased is if it's been assigned to
360        #    an active hqe or another similar frontend task, but doing so will
361        #    have already precluded it from the list of tasks returned by the
362        #    job_query_manager.
363        # 2. The unleasing is done based on global conditions. Eg: Even if a
364        #    task has already leased a host and we lease it again, the
365        #    host scheduler won't release the host till both tasks are complete.
366        if lease_hostnames:
367            self.host_query_manager.set_leased(
368                    True, hostname__in=lease_hostnames)
369
370
371    def acquire_hosts(self, host_jobs):
372        """Override acquire_hosts.
373
374        This method overrides the method in parent class.
375        It figures out a set of suites that |host_jobs| belong to;
376        and get min_duts requirement for each suite.
377        It pipes min_duts for each suite to rdb.
378
379        """
380        parent_job_ids = set([q.job.parent_job_id
381                              for q in host_jobs if q.job.parent_job_id])
382        suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
383        return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
384
385
386    @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
387    def tick(self):
388        logging.info('Calling new tick.')
389        logging.info('Leasing hosts for frontend tasks.')
390        self._lease_hosts_of_frontend_tasks()
391        logging.info('Finding hosts for new jobs.')
392        self._schedule_jobs()
393        logging.info('Releasing unused hosts.')
394        released_hosts = self._release_hosts()
395        logging.info('Updating suite assignment with released hosts')
396        self._suite_recorder.record_release(released_hosts)
397        logging.info('Calling email_manager.')
398        email_manager.manager.send_queued_emails()
399
400
401class DummyHostScheduler(BaseHostScheduler):
402    """A dummy host scheduler that doesn't acquire or release hosts."""
403
404    def __init__(self):
405        pass
406
407
408    def tick(self):
409        pass
410
411
412def handle_signal(signum, frame):
413    """Sigint handler so we don't crash mid-tick."""
414    global _shutdown
415    _shutdown = True
416    logging.info("Shutdown request received.")
417
418
419def initialize(testing=False):
420    """Initialize the host scheduler."""
421    if testing:
422        # Don't import testing utilities unless we're in testing mode,
423        # as the database imports have side effects.
424        from autotest_lib.scheduler import rdb_testing_utils
425        rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
426                db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
427    global _db_manager
428    _db_manager = scheduler_lib.ConnectionManager()
429    scheduler_lib.setup_logging(
430            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
431            None, timestamped_logfile_prefix='host_scheduler')
432    logging.info("Setting signal handler")
433    signal.signal(signal.SIGINT, handle_signal)
434    signal.signal(signal.SIGTERM, handle_signal)
435    scheduler_models.initialize()
436
437
438def parse_arguments(argv):
439    """
440    Parse command line arguments
441
442    @param argv: argument list to parse
443    @returns:    parsed arguments.
444    """
445    parser = argparse.ArgumentParser(description='Host scheduler.')
446    parser.add_argument('--testing', action='store_true', default=False,
447                        help='Start the host scheduler in testing mode.')
448    parser.add_argument('--production',
449                        help=('Indicate that scheduler is running in production'
450                              ' environment and it can use database that is not'
451                              ' hosted in localhost. If it is set to False, '
452                              'scheduler will fail if database is not in '
453                              'localhost.'),
454                        action='store_true', default=False)
455    options = parser.parse_args(argv)
456
457    return options
458
459
460def main():
461    if _monitor_db_host_acquisition:
462        logging.info('Please set inline_host_acquisition=False in the shadow '
463                     'config before starting the host scheduler.')
464        # The upstart job for the host scheduler understands exit(0) to mean
465        # 'don't respawn'. This is desirable when the job scheduler is acquiring
466        # hosts inline.
467        sys.exit(0)
468    try:
469        options = parse_arguments(sys.argv[1:])
470        scheduler_lib.check_production_settings(options)
471
472        # If server database is enabled, check if the server has role
473        # `host_scheduler`. If the server does not have host_scheduler role,
474        # exception will be raised and host scheduler will not continue to run.
475        if server_manager_utils.use_server_db():
476            server_manager_utils.confirm_server_has_role(hostname='localhost',
477                                                         role='host_scheduler')
478
479        initialize(options.testing)
480
481        # Start the thread to report metadata.
482        metadata_reporter.start()
483
484        ts_mon_config.SetupTsMonGlobalState('autotest_host_scheduler')
485
486        host_scheduler = HostScheduler()
487        minimum_tick_sec = global_config.global_config.get_config_value(
488                'SCHEDULER', 'minimum_tick_sec', type=float)
489        while not _shutdown:
490            start = time.time()
491            host_scheduler.tick()
492            curr_tick_sec = time.time() - start
493            if (minimum_tick_sec > curr_tick_sec):
494                time.sleep(minimum_tick_sec - curr_tick_sec)
495            else:
496                time.sleep(0.0001)
497    except server_manager_utils.ServerActionError as e:
498        # This error is expected when the server is not in primary status
499        # for host-scheduler role. Thus do not send email for it.
500        raise
501    except Exception:
502        email_manager.manager.log_stacktrace(
503                'Uncaught exception; terminating host_scheduler.')
504        raise
505    finally:
506        email_manager.manager.send_queued_emails()
507        if _db_manager:
508            _db_manager.disconnect()
509        metadata_reporter.abort()
510
511
512if __name__ == '__main__':
513    main()
514