1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Host scheduler.
9
10If run as a standalone service, the host scheduler ensures the following:
11    1. Hosts will not be assigned to multiple hqes simultaneously. The process
12       of assignment in this case refers to the modification of the host_id
13       column of a row in the afe_host_queue_entries table, to reflect the host
14       id of a leased host matching the dependencies of the job.
15    2. Hosts that are not being used by active hqes or incomplete special tasks
16       will be released back to the available hosts pool, for acquisition by
17       subsequent hqes.
18In addition to these guarantees, the host scheduler also confirms that no 2
19active hqes/special tasks are assigned the same host, and sets the leased bit
20for hosts needed by frontend special tasks. The need for the latter is only
21apparent when viewed in the context of the job-scheduler (monitor_db), which
22runs special tasks only after their hosts have been leased.
23
24** Suport minimum duts requirement for suites (non-inline mode) **
25
26Each suite can specify the minimum number of duts it requires by
27dropping a 'suite_min_duts' job keyval which defaults to 0.
28
29When suites are competing for duts, if any suite has not got minimum duts
30it requires, the host scheduler will try to meet the requirement first,
31even if other suite may have higher priority or earlier timestamp. Once
32all suites' minimum duts requirement have been fullfilled, the host
33scheduler will allocate the rest of duts based on job priority and suite job id.
34This is to prevent low priority suites from starving when sharing pool with
35high-priority suites.
36
37Note:
38    1. Prevent potential starvation:
39       We need to carefully choose |suite_min_duts| for both low and high
40       priority suites. If a high priority suite didn't specify it but a low
41       priority one does, the high priority suite can be starved!
42    2. Restart requirement:
43       Restart host scheduler if you manually released a host by setting
44       leased=0 in db. This is needed because host scheduler maintains internal
45       state of host assignment for suites.
46    3. Exchanging duts triggers provisioning:
47       TODO(fdeng): There is a chance two suites can exchange duts,
48       if the two suites are for different builds, the exchange
49       will trigger provisioning. This can be optimized by preferring getting
50       hosts with the same build.
51"""
52
53import argparse
54import collections
55import datetime
56import logging
57import os
58import signal
59import sys
60import time
61
62import common
63from autotest_lib.frontend import setup_django_environment
64
65from autotest_lib.client.common_lib import global_config
66from autotest_lib.client.common_lib.cros.graphite import autotest_stats
67from autotest_lib.scheduler import email_manager
68from autotest_lib.scheduler import query_managers
69from autotest_lib.scheduler import rdb_lib
70from autotest_lib.scheduler import rdb_utils
71from autotest_lib.scheduler import scheduler_lib
72from autotest_lib.scheduler import scheduler_models
73from autotest_lib.site_utils import job_overhead
74from autotest_lib.site_utils import metadata_reporter
75from autotest_lib.site_utils import server_manager_utils
76
77_db_manager = None
78_shutdown = False
79_tick_pause_sec = global_config.global_config.get_config_value(
80        'SCHEDULER', 'tick_pause_sec', type=int, default=5)
81_monitor_db_host_acquisition = global_config.global_config.get_config_value(
82        'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
83
84
85class SuiteRecorder(object):
86    """Recording the host assignment for suites.
87
88    The recorder holds two things:
89        * suite_host_num, records how many duts a suite is holding,
90          which is a map <suite_job_id -> num_of_hosts>
91        * hosts_to_suites, records which host is assigned to which
92          suite, it is a map <host_id -> suite_job_id>
93    The two datastructure got updated when a host is assigned to or released
94    by a job.
95
96    The reason to maintain hosts_to_suites is that, when a host is released,
97    we need to know which suite it was leased to. Querying the db for the
98    latest completed job that has run on a host is slow.  Therefore, we go with
99    an alternative: keeping a <host id, suite job id> map
100    in memory (for 10K hosts, the map should take less than 1M memory on
101    64-bit machine with python 2.7)
102
103    """
104
105
106    _timer = autotest_stats.Timer('suite_recorder')
107
108
109    def __init__(self, job_query_manager):
110        """Initialize.
111
112        @param job_queue_manager: A JobQueueryManager object.
113        """
114        self.job_query_manager = job_query_manager
115        self.suite_host_num, self.hosts_to_suites = (
116                self.job_query_manager.get_suite_host_assignment())
117
118
119    def record_assignment(self, queue_entry):
120        """Record that the hqe has got a host.
121
122        @param queue_entry: A scheduler_models.HostQueueEntry object which has
123                            got a host.
124        """
125        parent_id = queue_entry.job.parent_job_id
126        if not parent_id:
127            return
128        if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
129            logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
130                          'seems already recorded', queue_entry.id,
131                          parent_id, queue_entry.host.hostname)
132            return
133        num_hosts = self.suite_host_num.get(parent_id, 0)
134        self.suite_host_num[parent_id] = num_hosts + 1
135        self.hosts_to_suites[queue_entry.host_id] = parent_id
136        logging.debug('Suite %d got host %s, currently holding %d hosts',
137                      parent_id, queue_entry.host.hostname,
138                      self.suite_host_num[parent_id])
139
140
141    def record_release(self, hosts):
142        """Update the record with host releasing event.
143
144        @param hosts: A list of scheduler_models.Host objects.
145        """
146        for host in hosts:
147            if host.id in self.hosts_to_suites:
148                parent_job_id = self.hosts_to_suites.pop(host.id)
149                count = self.suite_host_num[parent_job_id] - 1
150                if count == 0:
151                    del self.suite_host_num[parent_job_id]
152                else:
153                    self.suite_host_num[parent_job_id] = count
154                logging.debug(
155                        'Suite %d releases host %s, currently holding %d hosts',
156                        parent_job_id, host.hostname, count)
157
158
159    def get_min_duts(self, suite_job_ids):
160        """Figure out min duts to request.
161
162        Given a set ids of suite jobs, figure out minimum duts to request for
163        each suite. It is determined by two factors: min_duts specified
164        for each suite in its job keyvals, and how many duts a suite is
165        currently holding.
166
167        @param suite_job_ids: A set of suite job ids.
168
169        @returns: A dictionary, the key is suite_job_id, the value
170                  is the minimum number of duts to request.
171        """
172        suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
173                suite_job_ids)
174        for parent_id in suite_job_ids:
175            min_duts = suite_min_duts.get(parent_id, 0)
176            cur_duts = self.suite_host_num.get(parent_id, 0)
177            suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
178        logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
179                      suite_min_duts)
180        return suite_min_duts
181
182
183class BaseHostScheduler(object):
184    """Base class containing host acquisition logic.
185
186    This class contains all the core host acquisition logic needed by the
187    scheduler to run jobs on hosts. It is only capable of releasing hosts
188    back to the rdb through its tick, any other action must be instigated by
189    the job scheduler.
190    """
191
192
193    _timer = autotest_stats.Timer('base_host_scheduler')
194    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
195
196
197    def __init__(self):
198        self.host_query_manager = query_managers.AFEHostQueryManager()
199
200
201    @_timer.decorate
202    def _release_hosts(self):
203        """Release hosts to the RDB.
204
205        Release all hosts that are ready and are currently not being used by an
206        active hqe, and don't have a new special task scheduled against them.
207
208        @return a list of hosts that are released.
209        """
210        release_hosts = self.host_query_manager.find_unused_healty_hosts()
211        release_hostnames = [host.hostname for host in release_hosts]
212        if release_hostnames:
213            self.host_query_manager.set_leased(
214                    False, hostname__in=release_hostnames)
215        return release_hosts
216
217
218    @classmethod
219    def schedule_host_job(cls, host, queue_entry):
220        """Schedule a job on a host.
221
222        Scheduling a job involves:
223            1. Setting the active bit on the queue_entry.
224            2. Scheduling a special task on behalf of the queue_entry.
225        Performing these actions will lead the job scheduler through a chain of
226        events, culminating in running the test and collecting results from
227        the host.
228
229        @param host: The host against which to schedule the job.
230        @param queue_entry: The queue_entry to schedule.
231        """
232        if queue_entry.host_id is None:
233            queue_entry.set_host(host)
234        elif host.id != queue_entry.host_id:
235                raise rdb_utils.RDBException('The rdb returned host: %s '
236                        'but the job:%s was already assigned a host: %s. ' %
237                        (host.hostname, queue_entry.job_id,
238                         queue_entry.host.hostname))
239        queue_entry.update_field('active', True)
240
241        # TODO: crbug.com/373936. The host scheduler should only be assigning
242        # jobs to hosts, but the criterion we use to release hosts depends
243        # on it not being used by an active hqe. Since we're activating the
244        # hqe here, we also need to schedule its first prejob task. OTOH,
245        # we could converge to having the host scheduler manager all special
246        # tasks, since their only use today is to verify/cleanup/reset a host.
247        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
248        queue_entry.schedule_pre_job_tasks()
249
250
251    def acquire_hosts(self, host_jobs):
252        """Accquire hosts for given jobs.
253
254        This method sends jobs that need hosts to rdb.
255        Child class can override this method to pipe more args
256        to rdb.
257
258        @param host_jobs: A list of queue entries that either require hosts,
259            or require host assignment validation through the rdb.
260
261        @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
262                       for each host acquired on behalf of a queue_entry,
263                       or None if a host wasn't found.
264        """
265        return rdb_lib.acquire_hosts(host_jobs)
266
267
268    def find_hosts_for_jobs(self, host_jobs):
269        """Find and verify hosts for a list of jobs.
270
271        @param host_jobs: A list of queue entries that either require hosts,
272            or require host assignment validation through the rdb.
273        @return: A list of tuples of the form (host, queue_entry) for each
274            valid host-queue_entry assignment.
275        """
276        jobs_with_hosts = []
277        hosts = self.acquire_hosts(host_jobs)
278        for host, job in zip(hosts, host_jobs):
279            if host:
280                jobs_with_hosts.append(self.host_assignment(host, job))
281        return jobs_with_hosts
282
283
284    @_timer.decorate
285    def tick(self):
286        """Schedule core host management activities."""
287        self._release_hosts()
288
289
290class HostScheduler(BaseHostScheduler):
291    """A scheduler capable managing host acquisition for new jobs."""
292
293    _timer = autotest_stats.Timer('host_scheduler')
294
295
296    def __init__(self):
297        super(HostScheduler, self).__init__()
298        self.job_query_manager = query_managers.AFEJobQueryManager()
299        # Keeping track on how many hosts each suite is holding
300        # {suite_job_id: num_hosts}
301        self._suite_recorder = SuiteRecorder(self.job_query_manager)
302
303
304    def _record_host_assignment(self, host, queue_entry):
305        """Record that |host| is assigned to |queue_entry|.
306
307        Record:
308            1. How long it takes to assign a host to a job in metadata db.
309            2. Record host assignment of a suite.
310
311        @param host: A Host object.
312        @param queue_entry: A HostQueueEntry object.
313        """
314        secs_in_queued = (datetime.datetime.now() -
315                          queue_entry.job.created_on).total_seconds()
316        job_overhead.record_state_duration(
317                queue_entry.job_id, host.hostname,
318                job_overhead.STATUS.QUEUED, secs_in_queued)
319        self._suite_recorder.record_assignment(queue_entry)
320
321
322    @_timer.decorate
323    def _schedule_jobs(self):
324        """Schedule new jobs against hosts."""
325
326        key = 'host_scheduler.jobs_per_tick'
327        new_jobs_with_hosts = 0
328        queue_entries = self.job_query_manager.get_pending_queue_entries(
329                only_hostless=False)
330        unverified_host_jobs = [job for job in queue_entries
331                                if not job.is_hostless()]
332        if not unverified_host_jobs:
333            return
334        for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
335            self.schedule_host_job(acquisition.host, acquisition.job)
336            self._record_host_assignment(acquisition.host, acquisition.job)
337            new_jobs_with_hosts += 1
338        autotest_stats.Gauge(key).send('new_jobs_with_hosts',
339                                       new_jobs_with_hosts)
340        autotest_stats.Gauge(key).send('new_jobs_without_hosts',
341                                       len(unverified_host_jobs) -
342                                       new_jobs_with_hosts)
343
344
345    @_timer.decorate
346    def _lease_hosts_of_frontend_tasks(self):
347        """Lease hosts of tasks scheduled through the frontend."""
348        # We really don't need to get all the special tasks here, just the ones
349        # without hqes, but reusing the method used by the scheduler ensures
350        # we prioritize the same way.
351        lease_hostnames = [
352                task.host.hostname for task in
353                self.job_query_manager.get_prioritized_special_tasks(
354                    only_tasks_with_leased_hosts=False)
355                if task.queue_entry_id is None and not task.host.leased]
356        # Leasing a leased hosts here shouldn't be a problem:
357        # 1. The only way a host can be leased is if it's been assigned to
358        #    an active hqe or another similar frontend task, but doing so will
359        #    have already precluded it from the list of tasks returned by the
360        #    job_query_manager.
361        # 2. The unleasing is done based on global conditions. Eg: Even if a
362        #    task has already leased a host and we lease it again, the
363        #    host scheduler won't release the host till both tasks are complete.
364        if lease_hostnames:
365            self.host_query_manager.set_leased(
366                    True, hostname__in=lease_hostnames)
367
368
369    def acquire_hosts(self, host_jobs):
370        """Override acquire_hosts.
371
372        This method overrides the method in parent class.
373        It figures out a set of suites that |host_jobs| belong to;
374        and get min_duts requirement for each suite.
375        It pipes min_duts for each suite to rdb.
376
377        """
378        parent_job_ids = set([q.job.parent_job_id
379                              for q in host_jobs if q.job.parent_job_id])
380        suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
381        return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
382
383
384    @_timer.decorate
385    def tick(self):
386        logging.info('Calling new tick.')
387        logging.info('Leasing hosts for frontend tasks.')
388        self._lease_hosts_of_frontend_tasks()
389        logging.info('Finding hosts for new jobs.')
390        self._schedule_jobs()
391        logging.info('Releasing unused hosts.')
392        released_hosts = self._release_hosts()
393        logging.info('Updating suite assignment with released hosts')
394        self._suite_recorder.record_release(released_hosts)
395        logging.info('Calling email_manager.')
396        email_manager.manager.send_queued_emails()
397
398
399class DummyHostScheduler(BaseHostScheduler):
400    """A dummy host scheduler that doesn't acquire or release hosts."""
401
402    def __init__(self):
403        pass
404
405
406    def tick(self):
407        pass
408
409
410def handle_signal(signum, frame):
411    """Sigint handler so we don't crash mid-tick."""
412    global _shutdown
413    _shutdown = True
414    logging.info("Shutdown request received.")
415
416
417def initialize(testing=False):
418    """Initialize the host scheduler."""
419    if testing:
420        # Don't import testing utilities unless we're in testing mode,
421        # as the database imports have side effects.
422        from autotest_lib.scheduler import rdb_testing_utils
423        rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
424                db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
425    global _db_manager
426    _db_manager = scheduler_lib.ConnectionManager()
427    scheduler_lib.setup_logging(
428            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
429            None, timestamped_logfile_prefix='host_scheduler')
430    logging.info("Setting signal handler")
431    signal.signal(signal.SIGINT, handle_signal)
432    signal.signal(signal.SIGTERM, handle_signal)
433    scheduler_models.initialize()
434
435
436def parse_arguments(argv):
437    """
438    Parse command line arguments
439
440    @param argv: argument list to parse
441    @returns:    parsed arguments.
442    """
443    parser = argparse.ArgumentParser(description='Host scheduler.')
444    parser.add_argument('--testing', action='store_true', default=False,
445                        help='Start the host scheduler in testing mode.')
446    parser.add_argument('--production',
447                        help=('Indicate that scheduler is running in production'
448                              ' environment and it can use database that is not'
449                              ' hosted in localhost. If it is set to False, '
450                              'scheduler will fail if database is not in '
451                              'localhost.'),
452                        action='store_true', default=False)
453    options = parser.parse_args(argv)
454
455    return options
456
457
458def main():
459    if _monitor_db_host_acquisition:
460        logging.info('Please set inline_host_acquisition=False in the shadow '
461                     'config before starting the host scheduler.')
462        # The upstart job for the host scheduler understands exit(0) to mean
463        # 'don't respawn'. This is desirable when the job scheduler is acquiring
464        # hosts inline.
465        sys.exit(0)
466    try:
467        options = parse_arguments(sys.argv[1:])
468        scheduler_lib.check_production_settings(options)
469
470        # If server database is enabled, check if the server has role
471        # `host_scheduler`. If the server does not have host_scheduler role,
472        # exception will be raised and host scheduler will not continue to run.
473        if server_manager_utils.use_server_db():
474            server_manager_utils.confirm_server_has_role(hostname='localhost',
475                                                         role='host_scheduler')
476
477        initialize(options.testing)
478
479        # Start the thread to report metadata.
480        metadata_reporter.start()
481
482        host_scheduler = HostScheduler()
483        minimum_tick_sec = global_config.global_config.get_config_value(
484                'SCHEDULER', 'minimum_tick_sec', type=float)
485        while not _shutdown:
486            start = time.time()
487            host_scheduler.tick()
488            curr_tick_sec = time.time() - start
489            if (minimum_tick_sec > curr_tick_sec):
490                time.sleep(minimum_tick_sec - curr_tick_sec)
491            else:
492                time.sleep(0.0001)
493    except Exception:
494        email_manager.manager.log_stacktrace(
495                'Uncaught exception; terminating host_scheduler.')
496        raise
497    finally:
498        email_manager.manager.send_queued_emails()
499        if _db_manager:
500            _db_manager.disconnect()
501        metadata_reporter.abort()
502
503
504if __name__ == '__main__':
505    main()
506