shard_client.py revision 1e1c41b1b4a1b97c0b7086b8430856ed45e064d3
13b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich#!/usr/bin/python
23b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich#pylint: disable-msg=C0111
33b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
43b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
53b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# Use of this source code is governed by a BSD-style license that can be
63b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# found in the LICENSE file.
73b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
83b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport argparse
93b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport logging
103b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport os
113b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport signal
12f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelichimport socket
133b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport time
143b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
153b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport common
163b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.frontend import setup_django_environment
173b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.client.common_lib import error
183b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.client.common_lib import global_config
191e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Blackfrom autotest_lib.client.common_lib.cros.graphite import autotest_stats
2022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanianfrom autotest_lib.frontend.afe import models
213b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.scheduler import email_manager
225949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanianfrom autotest_lib.scheduler import scheduler_lib
238421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelichfrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers
2475be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanianfrom django.db import transaction
253b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
263b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich"""
273b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichAutotest shard client
283b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
293b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichThe shard client can be run as standalone service. It periodically polls the
303b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichmaster in a heartbeat, retrieves new jobs and hosts and inserts them into the
313b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichlocal database.
323b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
333b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichA shard is set up (by a human) and pointed to the global AFE (cautotest).
343b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichOn the shard, this script periodically makes so called heartbeat requests to the
353b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichglobal AFE, which will then complete the following actions:
363b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
373b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich1. Find the previously created (with atest) record for the shard. Shards are
383b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   identified by their hostnames, specified in the shadow_config.
393b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich2. Take the records that were sent in the heartbeat and insert them into the
403b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   global database.
413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - This is to set the status of jobs to completed in the master database after
423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     they were run by a slave. This is necessary so one can just look at the
433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     master's afe to see the statuses of all jobs. Otherwise one would have to
443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     check the tko tables or the individual slave AFEs.
453b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich3. Find labels that have been assigned to this shard.
461b52574752be108a743d3b33561c34324f8538e7Jakob Juelich4. Assign hosts that:
471b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have the specified label
481b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - aren't leased
491b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have an id which is not in the known_host_ids which were sent in the
501b52574752be108a743d3b33561c34324f8538e7Jakob Juelich     heartbeat request.
511b52574752be108a743d3b33561c34324f8538e7Jakob Juelich5. Assign jobs that:
523b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - depend on the specified label
533b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - haven't been assigned before
543b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - aren't started yet
553b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - aren't completed yet
561b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have an id which is not in the jobs_known_ids which were sent in the
571b52574752be108a743d3b33561c34324f8538e7Jakob Juelich     heartbeat request.
581b52574752be108a743d3b33561c34324f8538e7Jakob Juelich6. Serialize the chosen jobs and hosts.
593b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
603b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     and many more. Details about this can be found around
613b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     model_logic.serialize()
623b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich7. Send these objects to the slave.
633b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
643b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
653b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichOn the client side, this will happen:
663b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich1. Deserialize the objects sent from the master and persist them to the local
673b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   database.
683b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich2. monitor_db on the shard will pick up these jobs and schedule them on the
693b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   available hosts (which were retrieved from a heartbeat).
703b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich3. Once a job is finished, it's shard_id is set to NULL
713b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich4. The shard_client will pick up all jobs where shard_id=NULL and will
723b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   send them to the master in the request of the next heartbeat.
733b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - The master will persist them as described earlier.
743b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - the shard_id will be set back to the shard's id, so the record won't be
753b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     uploaded again.
761b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   The heartbeat request will also contain the ids of incomplete jobs and the
771b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   ids of all hosts. This is used to not send objects repeatedly. For more
781b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   information on this and alternatives considered
791b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   see site_rpc_interface.shard_heartbeat.
803b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich"""
813b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
823b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
833b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichHEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
84f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich
858421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichRPC_TIMEOUT_MIN = 5
868421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichRPC_DELAY_SEC = 5
878421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
888421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichSTATS_KEY = 'shard_client.%s' % socket.gethostname()
891e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Blacktimer = autotest_stats.Timer(STATS_KEY)
9022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian_heartbeat_client = None
913b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
923b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
933b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichclass ShardClient(object):
943b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Performs client side tasks of sharding, i.e. the heartbeat.
953b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
968421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    This class contains the logic to do periodic heartbeats to a global AFE,
973b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    to retrieve new jobs from it and to report completed jobs back.
983b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """
993b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1003b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
1018421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
1028421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                                                 timeout_min=RPC_TIMEOUT_MIN,
1038421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                                                 delay_sec=RPC_DELAY_SEC)
1043b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.hostname = shard_hostname
1053b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.tick_pause_sec = tick_pause_sec
1063b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self._shutdown = False
1078421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        self._shard = None
1083b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1093b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
110f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich    @timer.decorate
1113b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def process_heartbeat_response(self, heartbeat_response):
1123b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Save objects returned by a heartbeat to the local database.
1133b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1143b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        This deseralizes hosts and jobs including their dependencies and saves
1153b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        them to the local database.
1163b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1173b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
1183b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich                                   as returned by the `shard_heartbeat` rpc
1193b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich                                   call.
1203b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """
1213b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        hosts_serialized = heartbeat_response['hosts']
1223b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        jobs_serialized = heartbeat_response['jobs']
1233b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1241e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
125f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich            'hosts_received', len(hosts_serialized))
1261e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
127f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich            'jobs_received', len(jobs_serialized))
128f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich
1293b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        for host in hosts_serialized:
13075be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanian            with transaction.commit_on_success():
13175be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanian                models.Host.deserialize(host)
1323b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        for job in jobs_serialized:
13375be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanian            with transaction.commit_on_success():
13475be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanian                models.Job.deserialize(job)
1353b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
13622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        job_ids = [j['id'] for j in jobs_serialized]
13722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Heartbeat response contains jobs %s', job_ids)
13822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
13922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # If the master has just sent any jobs that we think have completed,
14022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # re-sync them with the master. This is especially useful when a
14122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # heartbeat or job is silently dropped, as the next heartbeat will
14222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # have a disagreement. Updating the shard_id to NULL will mark these
14322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # jobs for upload on the next heartbeat.
14422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        models.Job.objects.filter(
14522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                id__in=job_ids,
14622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                hostqueueentry__complete=True).update(shard=None)
14722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
1483b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1498421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    @property
1508421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def shard(self):
1518421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        """Return this shard's own shard object, fetched from the database.
1528421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1538421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        A shard's object is fetched from the master with the first jobs. It will
1548421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        not exist before that time.
1558421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1568421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        @returns: The shard object if it already exists, otherwise None
1578421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        """
1588421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        if self._shard is None:
1598421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            try:
1608421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                self._shard = models.Shard.smart_get(self.hostname)
1618421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            except models.Shard.DoesNotExist:
1628421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                # This might happen before any jobs are assigned to this shard.
1638421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                # This is okay because then there is nothing to offload anyway.
1648421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                pass
1658421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return self._shard
1668421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1678421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1688421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def _get_jobs_to_upload(self):
1698421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        jobs = []
1708421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # The scheduler sets shard to None upon completion of the job.
1718421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # For more information on the shard field's semantic see
17222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # models.Job.shard. We need to be careful to wait for both the
17322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # shard_id and the complete bit here, or we will end up syncing
17422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # the job without ever setting the complete bit.
1758421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        job_ids = list(models.Job.objects.filter(
17622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian            shard=None,
17722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian            hostqueueentry__complete=True).values_list('pk', flat=True))
1788421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1798421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
1808421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            jobs.append(job_to_upload)
1818421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return jobs
1828421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1838421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
18422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian    def _mark_jobs_as_uploaded(self, job_ids):
1858421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # self.shard might be None if no jobs were downloaded yet.
1868421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # But then job_ids is empty, so this is harmless.
1878421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # Even if there were jobs we'd in the worst case upload them twice.
1888421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
1898421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1908421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1918421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def _get_hqes_for_jobs(self, jobs):
1928421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        hqes = []
1938421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        for job in jobs:
1948421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            hqes.extend(job.hostqueueentry_set.all())
1958421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return hqes
1968421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1978421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1981b52574752be108a743d3b33561c34324f8538e7Jakob Juelich    def _get_known_ids(self):
1991b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        """Returns lists of host and job ids to send in a heartbeat.
2001b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2011b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        The host and job ids are ids of objects that are already present on the
2021b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        shard and therefore don't need to be sent again.
2031b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2041b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        For jobs, only incomplete jobs are sent, as the master won't sent
2051b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        already completed jobs anyway. This helps keeping the list of id's
2061b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        considerably small.
2071b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2081b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        @returns: Tuple of two dictionaries. The first one contains job ids, the
2091b52574752be108a743d3b33561c34324f8538e7Jakob Juelich                  second one host ids.
2101b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        """
2111b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        job_ids = list(models.Job.objects.filter(
2121b52574752be108a743d3b33561c34324f8538e7Jakob Juelich            hostqueueentry__complete=False).values_list('id', flat=True))
2135949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian        host_ids = list(models.Host.objects.filter(
2145949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian                invalid=0).values_list('id', flat=True))
2151b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        return job_ids, host_ids
2161b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2171b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
21822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian    def _heartbeat_packet(self):
21922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        """Construct the heartbeat packet.
22022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
22122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        See site_rpc_interface for a more detailed description of the heartbeat.
22222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
22322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        @return: A heartbeat packet.
22422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        """
22522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        known_job_ids, known_host_ids = self._get_known_ids()
22622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Known jobs: %s', known_job_ids)
22722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
22822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        job_objs = self._get_jobs_to_upload()
22922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        hqes = [hqe.serialize(include_dependencies=False)
23022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                for hqe in self._get_hqes_for_jobs(job_objs)]
23122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
23222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
23322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
23422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        return {'shard_hostname': self.hostname,
23522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'known_job_ids': known_job_ids,
23622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'known_host_ids': known_host_ids,
23722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'jobs': jobs, 'hqes': hqes}
23822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
23922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
240f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich    @timer.decorate
2413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def do_heartbeat(self):
2423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Perform a heartbeat: Retreive new jobs.
2433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        This function executes a `shard_heartbeat` RPC. It retrieves the
2453b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        response of this call and processes the response by storing the returned
2463b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        objects in the local database.
2473b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """
2483b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Performing heartbeat.")
2498421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
25022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        packet = self._heartbeat_packet()
2511e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
25222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'heartbeat.request_size', len(str(packet)))
25322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
2541e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
25522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'heartbeat.response_size', len(str(response)))
2568421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
25722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
2583b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.process_heartbeat_response(response)
2593b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Heartbeat completed.")
2603b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2613b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2623b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def tick(self):
2633b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Performs all tasks the shard clients needs to do periodically."""
2643b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.do_heartbeat()
2653b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2663b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2673b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def loop(self):
2683b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Calls tick() until shutdown() is called."""
2693b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        while not self._shutdown:
2703b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich            self.tick()
2713b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich            time.sleep(self.tick_pause_sec)
2723b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2733b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2743b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def shutdown(self):
2753b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Stops the shard client after the current tick."""
2763b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Shutdown request received.")
2773b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self._shutdown = True
2783b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2793b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2803b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef handle_signal(signum, frame):
2813b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Sigint handler so we don't crash mid-tick."""
2823b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client.shutdown()
2833b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2843b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2853b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef _get_global_afe_hostname():
2863b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Read the hostname of the global AFE from the global configuration."""
2873b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    return global_config.global_config.get_config_value(
2883b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich            'SHARD', 'global_afe_hostname')
2893b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2903b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2918421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelichdef _get_shard_hostname_and_ensure_running_on_shard():
2928421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    """Read the hostname the local shard from the global configuration.
2938421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2948421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    Raise an exception if run from elsewhere than a shard.
2958421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2968421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
2978421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            elsewhere than from a shard.
2988421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    """
2998421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    hostname = global_config.global_config.get_config_value(
3008421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        'SHARD', 'shard_hostname', default=None)
3018421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    if not hostname:
3028421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        raise error.HeartbeatOnlyAllowedInShardModeException(
3038421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            'To run the shard client, shard_hostname must neither be None nor '
3048421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            'empty.')
3058421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    return hostname
3063b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3073b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3083b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef _get_tick_pause_sec():
3093b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Read pause to make between two ticks from the global configuration."""
3103b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    return global_config.global_config.get_config_value(
3113b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        'SHARD', 'heartbeat_pause_sec', type=float)
3123b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3133b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3143b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef get_shard_client():
3153b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Instantiate a shard client instance.
3163b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3173b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    Configuration values will be read from the global configuration.
3183b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3193b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    @returns A shard client instance.
3203b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """
3213b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    global_afe_hostname = _get_global_afe_hostname()
3228421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
3233b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    tick_pause_sec = _get_tick_pause_sec()
3243b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
3253b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3263b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3273b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef main():
3283b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    try:
3291e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Counter(STATS_KEY + 'starts').increment()
3303b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        main_without_exception_handling()
3313b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    except Exception as e:
3323b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        message = 'Uncaught exception; terminating shard_client.'
3333b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        email_manager.manager.log_stacktrace(message)
3343b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.exception(message)
3351e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Counter(STATS_KEY + 'uncaught_exceptions').increment()
3363b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        raise
3373b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    finally:
3383b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        email_manager.manager.send_queued_emails()
3393b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3403b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef main_without_exception_handling():
3423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    parser = argparse.ArgumentParser(description='Shard client.')
3433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    options = parser.parse_args()
3443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3455949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian    scheduler_lib.setup_logging(
3465949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
3475949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian            None, timestamped_logfile_prefix='shard_client')
3483b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3493b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    logging.info("Setting signal handler.")
3503b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    signal.signal(signal.SIGINT, handle_signal)
3513b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    signal.signal(signal.SIGTERM, handle_signal)
3523b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3533b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    logging.info("Starting shard client.")
3543b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    global _heartbeat_client
3553b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client = get_shard_client()
3563b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client.loop()
3573b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3583b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3593b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichif __name__ == '__main__':
3603b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    main()
361