13b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich#!/usr/bin/python
23b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich#pylint: disable-msg=C0111
33b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
43b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
53b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# Use of this source code is governed by a BSD-style license that can be
63b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich# found in the LICENSE file.
73b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
83b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport argparse
98a43715afb478fa8be16187374618be33ff49442MK Ryuimport httplib
103b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport logging
113b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport os
123b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport signal
13f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelichimport socket
143b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport time
158a43715afb478fa8be16187374618be33ff49442MK Ryuimport urllib2
163b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
173b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichimport common
183b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.frontend import setup_django_environment
193b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.client.common_lib import error
203b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.client.common_lib import global_config
211e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Blackfrom autotest_lib.client.common_lib.cros.graphite import autotest_stats
2222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanianfrom autotest_lib.frontend.afe import models
233b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichfrom autotest_lib.scheduler import email_manager
245949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanianfrom autotest_lib.scheduler import scheduler_lib
258421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelichfrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers
260cb2a3b1d2d86d70da06a3f45be9297139e48207Fang Dengfrom autotest_lib.server import utils as server_utils
2789cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryufrom chromite.lib import timeout_util
2875be1d3f881ef4f4f9cffe0c38fc3139338d8f84Prashanth Balasubramanianfrom django.db import transaction
293b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
303b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich"""
313b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichAutotest shard client
323b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
333b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichThe shard client can be run as standalone service. It periodically polls the
343b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichmaster in a heartbeat, retrieves new jobs and hosts and inserts them into the
353b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichlocal database.
363b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
373b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichA shard is set up (by a human) and pointed to the global AFE (cautotest).
383b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichOn the shard, this script periodically makes so called heartbeat requests to the
393b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichglobal AFE, which will then complete the following actions:
403b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich1. Find the previously created (with atest) record for the shard. Shards are
423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   identified by their hostnames, specified in the shadow_config.
433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich2. Take the records that were sent in the heartbeat and insert them into the
443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   global database.
453b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - This is to set the status of jobs to completed in the master database after
463b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     they were run by a slave. This is necessary so one can just look at the
473b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     master's afe to see the statuses of all jobs. Otherwise one would have to
483b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     check the tko tables or the individual slave AFEs.
493b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich3. Find labels that have been assigned to this shard.
501b52574752be108a743d3b33561c34324f8538e7Jakob Juelich4. Assign hosts that:
511b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have the specified label
521b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - aren't leased
531b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have an id which is not in the known_host_ids which were sent in the
541b52574752be108a743d3b33561c34324f8538e7Jakob Juelich     heartbeat request.
551b52574752be108a743d3b33561c34324f8538e7Jakob Juelich5. Assign jobs that:
563b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - depend on the specified label
573b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - haven't been assigned before
583b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - aren't started yet
593b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - aren't completed yet
601b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   - have an id which is not in the jobs_known_ids which were sent in the
611b52574752be108a743d3b33561c34324f8538e7Jakob Juelich     heartbeat request.
621b52574752be108a743d3b33561c34324f8538e7Jakob Juelich6. Serialize the chosen jobs and hosts.
633b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
643b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     and many more. Details about this can be found around
653b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     model_logic.serialize()
663b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich7. Send these objects to the slave.
673b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
683b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
693b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichOn the client side, this will happen:
703b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich1. Deserialize the objects sent from the master and persist them to the local
713b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   database.
723b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich2. monitor_db on the shard will pick up these jobs and schedule them on the
733b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   available hosts (which were retrieved from a heartbeat).
743b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich3. Once a job is finished, it's shard_id is set to NULL
753b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich4. The shard_client will pick up all jobs where shard_id=NULL and will
763b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   send them to the master in the request of the next heartbeat.
773b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - The master will persist them as described earlier.
783b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich   - the shard_id will be set back to the shard's id, so the record won't be
793b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich     uploaded again.
801b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   The heartbeat request will also contain the ids of incomplete jobs and the
811b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   ids of all hosts. This is used to not send objects repeatedly. For more
821b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   information on this and alternatives considered
831b52574752be108a743d3b33561c34324f8538e7Jakob Juelich   see site_rpc_interface.shard_heartbeat.
843b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich"""
853b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
863b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
873b27dbc2358aef655e050a92510ff8e9e080bf81Jakob JuelichHEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
88f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich
898421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichRPC_TIMEOUT_MIN = 5
908421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichRPC_DELAY_SEC = 5
918421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
928421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob JuelichSTATS_KEY = 'shard_client.%s' % socket.gethostname()
931e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Blacktimer = autotest_stats.Timer(STATS_KEY)
9422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian_heartbeat_client = None
953b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
963b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
973b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichclass ShardClient(object):
983b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Performs client side tasks of sharding, i.e. the heartbeat.
993b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1008421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    This class contains the logic to do periodic heartbeats to a global AFE,
1013b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    to retrieve new jobs from it and to report completed jobs back.
1023b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """
1033b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1043b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
1058421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
1068421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                                                 timeout_min=RPC_TIMEOUT_MIN,
1078421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                                                 delay_sec=RPC_DELAY_SEC)
1083b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.hostname = shard_hostname
1093b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.tick_pause_sec = tick_pause_sec
1103b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self._shutdown = False
1118421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        self._shard = None
1123b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1133b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
114e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu    def _deserialize_many(self, serialized_list, djmodel, message):
115e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        """Deserialize data in JSON format to database.
116e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu
117e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        Deserialize a list of JSON-formatted data to database using Django.
118e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu
119e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        @param serialized_list: A list of JSON-formatted data.
120e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        @param djmodel: Django model type.
121e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        @param message: A string to be used in a logging message.
122e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        """
123e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        for serialized in serialized_list:
124e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu            with transaction.commit_on_success():
125e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                try:
126e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                    djmodel.deserialize(serialized)
127e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                except Exception as e:
128e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                    logging.error('Deserializing a %s fails: %s, Error: %s',
129e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                                  message, serialized, e)
130e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                    autotest_stats.Counter(STATS_KEY).increment(
131e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                            'deserialization_failures')
132e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu
133e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu
134f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich    @timer.decorate
1353b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def process_heartbeat_response(self, heartbeat_response):
1363b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Save objects returned by a heartbeat to the local database.
1373b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1383b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        This deseralizes hosts and jobs including their dependencies and saves
1393b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        them to the local database.
1403b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
1423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich                                   as returned by the `shard_heartbeat` rpc
1433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich                                   call.
1443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """
1453b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        hosts_serialized = heartbeat_response['hosts']
1463b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        jobs_serialized = heartbeat_response['jobs']
147f37059917de290d2983a7ca490884a8f694fb23eFang Deng        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
1483b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1491e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
150e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                'hosts_received', len(hosts_serialized))
1511e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
152e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                'jobs_received', len(jobs_serialized))
153f37059917de290d2983a7ca490884a8f694fb23eFang Deng        autotest_stats.Gauge(STATS_KEY).send(
154e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                'suite_keyvals_received', len(suite_keyvals_serialized))
155f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich
156e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        self._deserialize_many(hosts_serialized, models.Host, 'host')
157e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        self._deserialize_many(jobs_serialized, models.Job, 'job')
158e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
159e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                               'jobkeyval')
1603b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1615cfd96aca6204b6fd193ab2e15a24808756e6198MK Ryu        host_ids = [h['id'] for h in hosts_serialized]
1625cfd96aca6204b6fd193ab2e15a24808756e6198MK Ryu        logging.info('Heartbeat response contains hosts %s', host_ids)
16322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        job_ids = [j['id'] for j in jobs_serialized]
16422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Heartbeat response contains jobs %s', job_ids)
165f37059917de290d2983a7ca490884a8f694fb23eFang Deng        parent_jobs_with_keyval = set([kv['job_id']
166f37059917de290d2983a7ca490884a8f694fb23eFang Deng                                       for kv in suite_keyvals_serialized])
167f37059917de290d2983a7ca490884a8f694fb23eFang Deng        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
168e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                     list(parent_jobs_with_keyval))
16922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
17022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # If the master has just sent any jobs that we think have completed,
17122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # re-sync them with the master. This is especially useful when a
17222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # heartbeat or job is silently dropped, as the next heartbeat will
17322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # have a disagreement. Updating the shard_id to NULL will mark these
17422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # jobs for upload on the next heartbeat.
175e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        job_models = models.Job.objects.filter(
176e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                id__in=job_ids, hostqueueentry__complete=True)
177e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        if job_models:
178e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu            job_models.update(shard=None)
179e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu            job_ids_repr = ', '.join([str(job.id) for job in job_models])
180e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu            logging.warn('Following completed jobs are reset shard_id to NULL '
181e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu                         'to be uploaded to master again: %s', job_ids_repr)
18222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
1833b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
1848421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    @property
1858421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def shard(self):
1868421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        """Return this shard's own shard object, fetched from the database.
1878421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1888421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        A shard's object is fetched from the master with the first jobs. It will
1898421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        not exist before that time.
1908421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
1918421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        @returns: The shard object if it already exists, otherwise None
1928421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        """
1938421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        if self._shard is None:
1948421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            try:
1958421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                self._shard = models.Shard.smart_get(self.hostname)
1968421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            except models.Shard.DoesNotExist:
1978421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                # This might happen before any jobs are assigned to this shard.
1988421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                # This is okay because then there is nothing to offload anyway.
1998421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich                pass
2008421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return self._shard
2018421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2028421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2038421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def _get_jobs_to_upload(self):
2048421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        jobs = []
2058421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # The scheduler sets shard to None upon completion of the job.
2068421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # For more information on the shard field's semantic see
20722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # models.Job.shard. We need to be careful to wait for both the
20822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # shard_id and the complete bit here, or we will end up syncing
20922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        # the job without ever setting the complete bit.
2108421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        job_ids = list(models.Job.objects.filter(
21122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian            shard=None,
21222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian            hostqueueentry__complete=True).values_list('pk', flat=True))
2138421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2148421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
2158421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            jobs.append(job_to_upload)
2168421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return jobs
2178421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2188421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
21922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian    def _mark_jobs_as_uploaded(self, job_ids):
2208421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # self.shard might be None if no jobs were downloaded yet.
2218421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # But then job_ids is empty, so this is harmless.
2228421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        # Even if there were jobs we'd in the worst case upload them twice.
2238421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
2248421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2258421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2268421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    def _get_hqes_for_jobs(self, jobs):
2278421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        hqes = []
2288421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        for job in jobs:
2298421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            hqes.extend(job.hostqueueentry_set.all())
2308421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        return hqes
2318421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
2328421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
23307a109f19cd7363fb5440c72e870802392b7ce24MK Ryu    def _get_known_jobs_and_hosts(self):
23407a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        """Returns lists of host and job info to send in a heartbeat.
2351b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2361b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        The host and job ids are ids of objects that are already present on the
2371b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        shard and therefore don't need to be sent again.
2381b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
23907a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        For jobs, only incomplete jobs are sent, as the master won't send
2401b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        already completed jobs anyway. This helps keeping the list of id's
2411b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        considerably small.
2421b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
24307a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        For hosts, host status in addition to host id are sent to master
24407a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        to sync the host status.
24507a109f19cd7363fb5440c72e870802392b7ce24MK Ryu
24607a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        @returns: Tuple of three lists. The first one contains job ids, the
24707a109f19cd7363fb5440c72e870802392b7ce24MK Ryu                  second one host ids, and the third one host statuses.
2481b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        """
2491b52574752be108a743d3b33561c34324f8538e7Jakob Juelich        job_ids = list(models.Job.objects.filter(
25007a109f19cd7363fb5440c72e870802392b7ce24MK Ryu                hostqueueentry__complete=False).values_list('id', flat=True))
25107a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        host_models = models.Host.objects.filter(invalid=0)
25207a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        host_ids = []
25307a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        host_statuses = []
25407a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        for h in host_models:
25507a109f19cd7363fb5440c72e870802392b7ce24MK Ryu            host_ids.append(h.id)
25607a109f19cd7363fb5440c72e870802392b7ce24MK Ryu            host_statuses.append(h.status)
25707a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        return job_ids, host_ids, host_statuses
2581b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
2591b52574752be108a743d3b33561c34324f8538e7Jakob Juelich
26022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian    def _heartbeat_packet(self):
26122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        """Construct the heartbeat packet.
26222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
26322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        See site_rpc_interface for a more detailed description of the heartbeat.
26422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
26522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        @return: A heartbeat packet.
26622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        """
26707a109f19cd7363fb5440c72e870802392b7ce24MK Ryu        known_job_ids, known_host_ids, known_host_statuses = (
26807a109f19cd7363fb5440c72e870802392b7ce24MK Ryu                self._get_known_jobs_and_hosts())
26922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Known jobs: %s', known_job_ids)
27022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
27122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        job_objs = self._get_jobs_to_upload()
27222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        hqes = [hqe.serialize(include_dependencies=False)
27322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                for hqe in self._get_hqes_for_jobs(job_objs)]
27422dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
27522dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
27622dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
27722dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        return {'shard_hostname': self.hostname,
27822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'known_job_ids': known_job_ids,
27922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'known_host_ids': known_host_ids,
28007a109f19cd7363fb5440c72e870802392b7ce24MK Ryu                'known_host_statuses': known_host_statuses,
28122dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'jobs': jobs, 'hqes': hqes}
28222dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
28322dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian
2848a43715afb478fa8be16187374618be33ff49442MK Ryu    def _heartbeat_failure(self, log_message):
2858a43715afb478fa8be16187374618be33ff49442MK Ryu        logging.error("Heartbeat failed. %s", log_message)
2868a43715afb478fa8be16187374618be33ff49442MK Ryu        autotest_stats.Counter(STATS_KEY).increment('heartbeat_failures')
2878a43715afb478fa8be16187374618be33ff49442MK Ryu
2888a43715afb478fa8be16187374618be33ff49442MK Ryu
289f960d89a7b197fe3b3bd28546c6c89c2331b9f14Jakob Juelich    @timer.decorate
2903b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def do_heartbeat(self):
2913b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Perform a heartbeat: Retreive new jobs.
2923b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
2933b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        This function executes a `shard_heartbeat` RPC. It retrieves the
2943b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        response of this call and processes the response by storing the returned
2953b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        objects in the local database.
2963b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """
2973b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Performing heartbeat.")
29822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        packet = self._heartbeat_packet()
2991e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
30022dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'heartbeat.request_size', len(str(packet)))
3018a43715afb478fa8be16187374618be33ff49442MK Ryu
3028a43715afb478fa8be16187374618be33ff49442MK Ryu        try:
3038a43715afb478fa8be16187374618be33ff49442MK Ryu            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
30489cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu        except urllib2.HTTPError as e:
3058a43715afb478fa8be16187374618be33ff49442MK Ryu            self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason))
3068a43715afb478fa8be16187374618be33ff49442MK Ryu            return
30789cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu        except urllib2.URLError as e:
3088a43715afb478fa8be16187374618be33ff49442MK Ryu            self._heartbeat_failure("URLError: %s" % e.reason)
3098a43715afb478fa8be16187374618be33ff49442MK Ryu            return
31089cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu        except httplib.HTTPException as e:
3118a43715afb478fa8be16187374618be33ff49442MK Ryu            self._heartbeat_failure("HTTPException: %s" % e)
3128a43715afb478fa8be16187374618be33ff49442MK Ryu            return
31389cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu        except timeout_util.TimeoutError as e:
31489cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu            self._heartbeat_failure("TimeoutError: %s" % e)
31589cca5d6ef8df35f1b294b16bf536a8f3ffb5efbMK Ryu            return
3168a43715afb478fa8be16187374618be33ff49442MK Ryu
3171e1c41b1b4a1b97c0b7086b8430856ed45e064d3Gabe Black        autotest_stats.Gauge(STATS_KEY).send(
31822dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian                'heartbeat.response_size', len(str(response)))
31922dd226625255110c079e979113dcda1f4fa5ea8Prashanth Balasubramanian        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
3203b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.process_heartbeat_response(response)
3213b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Heartbeat completed.")
3223b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3233b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3243b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def tick(self):
3253b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Performs all tasks the shard clients needs to do periodically."""
3263b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self.do_heartbeat()
3273b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3283b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3293b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def loop(self):
3303b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Calls tick() until shutdown() is called."""
3313b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        while not self._shutdown:
3323b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich            self.tick()
3333b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich            time.sleep(self.tick_pause_sec)
3343b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3353b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3363b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    def shutdown(self):
3373b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        """Stops the shard client after the current tick."""
3383b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.info("Shutdown request received.")
3393b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        self._shutdown = True
3403b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3413b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3423b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef handle_signal(signum, frame):
3433b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Sigint handler so we don't crash mid-tick."""
3443b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client.shutdown()
3453b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3463b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3478421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelichdef _get_shard_hostname_and_ensure_running_on_shard():
3488421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    """Read the hostname the local shard from the global configuration.
3498421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
3508421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    Raise an exception if run from elsewhere than a shard.
3518421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich
3528421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
3538421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            elsewhere than from a shard.
3548421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    """
3558421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    hostname = global_config.global_config.get_config_value(
3568421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        'SHARD', 'shard_hostname', default=None)
3578421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    if not hostname:
3588421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich        raise error.HeartbeatOnlyAllowedInShardModeException(
3598421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            'To run the shard client, shard_hostname must neither be None nor '
3608421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich            'empty.')
3618421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    return hostname
3623b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3633b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3643b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef _get_tick_pause_sec():
3653b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Read pause to make between two ticks from the global configuration."""
3663b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    return global_config.global_config.get_config_value(
3673b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        'SHARD', 'heartbeat_pause_sec', type=float)
3683b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3693b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3703b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef get_shard_client():
3713b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """Instantiate a shard client instance.
3723b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3733b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    Configuration values will be read from the global configuration.
3743b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3753b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    @returns A shard client instance.
3763b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    """
3770cb2a3b1d2d86d70da06a3f45be9297139e48207Fang Deng    global_afe_hostname = server_utils.get_global_afe_hostname()
3788421d5905ab0aed8689c2eea6be8d9c4042ce618Jakob Juelich    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
3793b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    tick_pause_sec = _get_tick_pause_sec()
3803b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
3813b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3823b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3833b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef main():
3843b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    try:
3858a43715afb478fa8be16187374618be33ff49442MK Ryu        autotest_stats.Counter(STATS_KEY).increment('starts')
3863b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        main_without_exception_handling()
3873b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    except Exception as e:
388e72a90b5942d293f67f027eaa45f6b126680ec2aMK Ryu        message = 'Uncaught exception. Terminating shard_client.'
3893b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        email_manager.manager.log_stacktrace(message)
3903b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        logging.exception(message)
3918a43715afb478fa8be16187374618be33ff49442MK Ryu        autotest_stats.Counter(STATS_KEY).increment('uncaught_exceptions')
3923b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        raise
3933b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    finally:
3943b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich        email_manager.manager.send_queued_emails()
3953b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3963b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
3973b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichdef main_without_exception_handling():
3983b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    parser = argparse.ArgumentParser(description='Shard client.')
3993b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    options = parser.parse_args()
4003b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
4015949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian    scheduler_lib.setup_logging(
4025949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
4035949b4af7a872aeb58e7ad29090812d648725ed5Prashanth Balasubramanian            None, timestamped_logfile_prefix='shard_client')
4043b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
4053b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    logging.info("Setting signal handler.")
4063b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    signal.signal(signal.SIGINT, handle_signal)
4073b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    signal.signal(signal.SIGTERM, handle_signal)
4083b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
4093b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    logging.info("Starting shard client.")
4103b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    global _heartbeat_client
4113b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client = get_shard_client()
4123b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    _heartbeat_client.loop()
4133b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
4143b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich
4153b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelichif __name__ == '__main__':
4163b27dbc2358aef655e050a92510ff8e9e080bf81Jakob Juelich    main()
417