1#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
9import httplib
10import logging
11import os
12import signal
13import socket
14import time
15import urllib2
16
17import common
18from autotest_lib.frontend import setup_django_environment
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.client.common_lib.cros.graphite import autotest_stats
22from autotest_lib.frontend.afe import models
23from autotest_lib.scheduler import email_manager
24from autotest_lib.scheduler import scheduler_lib
25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
26from autotest_lib.server import utils as server_utils
27from chromite.lib import timeout_util
28from django.db import transaction
29
30"""
31Autotest shard client
32
33The shard client can be run as standalone service. It periodically polls the
34master in a heartbeat, retrieves new jobs and hosts and inserts them into the
35local database.
36
37A shard is set up (by a human) and pointed to the global AFE (cautotest).
38On the shard, this script periodically makes so called heartbeat requests to the
39global AFE, which will then complete the following actions:
40
411. Find the previously created (with atest) record for the shard. Shards are
42   identified by their hostnames, specified in the shadow_config.
432. Take the records that were sent in the heartbeat and insert them into the
44   global database.
45   - This is to set the status of jobs to completed in the master database after
46     they were run by a slave. This is necessary so one can just look at the
47     master's afe to see the statuses of all jobs. Otherwise one would have to
48     check the tko tables or the individual slave AFEs.
493. Find labels that have been assigned to this shard.
504. Assign hosts that:
51   - have the specified label
52   - aren't leased
53   - have an id which is not in the known_host_ids which were sent in the
54     heartbeat request.
555. Assign jobs that:
56   - depend on the specified label
57   - haven't been assigned before
58   - aren't started yet
59   - aren't completed yet
60   - have an id which is not in the jobs_known_ids which were sent in the
61     heartbeat request.
626. Serialize the chosen jobs and hosts.
63   - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
64     and many more. Details about this can be found around
65     model_logic.serialize()
667. Send these objects to the slave.
67
68
69On the client side, this will happen:
701. Deserialize the objects sent from the master and persist them to the local
71   database.
722. monitor_db on the shard will pick up these jobs and schedule them on the
73   available hosts (which were retrieved from a heartbeat).
743. Once a job is finished, it's shard_id is set to NULL
754. The shard_client will pick up all jobs where shard_id=NULL and will
76   send them to the master in the request of the next heartbeat.
77   - The master will persist them as described earlier.
78   - the shard_id will be set back to the shard's id, so the record won't be
79     uploaded again.
80   The heartbeat request will also contain the ids of incomplete jobs and the
81   ids of all hosts. This is used to not send objects repeatedly. For more
82   information on this and alternatives considered
83   see site_rpc_interface.shard_heartbeat.
84"""
85
86
87HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
88
89RPC_TIMEOUT_MIN = 5
90RPC_DELAY_SEC = 5
91
92STATS_KEY = 'shard_client.%s' % socket.gethostname()
93timer = autotest_stats.Timer(STATS_KEY)
94_heartbeat_client = None
95
96
97class ShardClient(object):
98    """Performs client side tasks of sharding, i.e. the heartbeat.
99
100    This class contains the logic to do periodic heartbeats to a global AFE,
101    to retrieve new jobs from it and to report completed jobs back.
102    """
103
104    def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
105        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
106                                                 timeout_min=RPC_TIMEOUT_MIN,
107                                                 delay_sec=RPC_DELAY_SEC)
108        self.hostname = shard_hostname
109        self.tick_pause_sec = tick_pause_sec
110        self._shutdown = False
111        self._shard = None
112
113
114    def _deserialize_many(self, serialized_list, djmodel, message):
115        """Deserialize data in JSON format to database.
116
117        Deserialize a list of JSON-formatted data to database using Django.
118
119        @param serialized_list: A list of JSON-formatted data.
120        @param djmodel: Django model type.
121        @param message: A string to be used in a logging message.
122        """
123        for serialized in serialized_list:
124            with transaction.commit_on_success():
125                try:
126                    djmodel.deserialize(serialized)
127                except Exception as e:
128                    logging.error('Deserializing a %s fails: %s, Error: %s',
129                                  message, serialized, e)
130                    autotest_stats.Counter(STATS_KEY).increment(
131                            'deserialization_failures')
132
133
134    @timer.decorate
135    def process_heartbeat_response(self, heartbeat_response):
136        """Save objects returned by a heartbeat to the local database.
137
138        This deseralizes hosts and jobs including their dependencies and saves
139        them to the local database.
140
141        @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
142                                   as returned by the `shard_heartbeat` rpc
143                                   call.
144        """
145        hosts_serialized = heartbeat_response['hosts']
146        jobs_serialized = heartbeat_response['jobs']
147        suite_keyvals_serialized = heartbeat_response['suite_keyvals']
148
149        autotest_stats.Gauge(STATS_KEY).send(
150                'hosts_received', len(hosts_serialized))
151        autotest_stats.Gauge(STATS_KEY).send(
152                'jobs_received', len(jobs_serialized))
153        autotest_stats.Gauge(STATS_KEY).send(
154                'suite_keyvals_received', len(suite_keyvals_serialized))
155
156        self._deserialize_many(hosts_serialized, models.Host, 'host')
157        self._deserialize_many(jobs_serialized, models.Job, 'job')
158        self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
159                               'jobkeyval')
160
161        host_ids = [h['id'] for h in hosts_serialized]
162        logging.info('Heartbeat response contains hosts %s', host_ids)
163        job_ids = [j['id'] for j in jobs_serialized]
164        logging.info('Heartbeat response contains jobs %s', job_ids)
165        parent_jobs_with_keyval = set([kv['job_id']
166                                       for kv in suite_keyvals_serialized])
167        logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
168                     list(parent_jobs_with_keyval))
169
170        # If the master has just sent any jobs that we think have completed,
171        # re-sync them with the master. This is especially useful when a
172        # heartbeat or job is silently dropped, as the next heartbeat will
173        # have a disagreement. Updating the shard_id to NULL will mark these
174        # jobs for upload on the next heartbeat.
175        job_models = models.Job.objects.filter(
176                id__in=job_ids, hostqueueentry__complete=True)
177        if job_models:
178            job_models.update(shard=None)
179            job_ids_repr = ', '.join([str(job.id) for job in job_models])
180            logging.warn('Following completed jobs are reset shard_id to NULL '
181                         'to be uploaded to master again: %s', job_ids_repr)
182
183
184    @property
185    def shard(self):
186        """Return this shard's own shard object, fetched from the database.
187
188        A shard's object is fetched from the master with the first jobs. It will
189        not exist before that time.
190
191        @returns: The shard object if it already exists, otherwise None
192        """
193        if self._shard is None:
194            try:
195                self._shard = models.Shard.smart_get(self.hostname)
196            except models.Shard.DoesNotExist:
197                # This might happen before any jobs are assigned to this shard.
198                # This is okay because then there is nothing to offload anyway.
199                pass
200        return self._shard
201
202
203    def _get_jobs_to_upload(self):
204        jobs = []
205        # The scheduler sets shard to None upon completion of the job.
206        # For more information on the shard field's semantic see
207        # models.Job.shard. We need to be careful to wait for both the
208        # shard_id and the complete bit here, or we will end up syncing
209        # the job without ever setting the complete bit.
210        job_ids = list(models.Job.objects.filter(
211            shard=None,
212            hostqueueentry__complete=True).values_list('pk', flat=True))
213
214        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
215            jobs.append(job_to_upload)
216        return jobs
217
218
219    def _mark_jobs_as_uploaded(self, job_ids):
220        # self.shard might be None if no jobs were downloaded yet.
221        # But then job_ids is empty, so this is harmless.
222        # Even if there were jobs we'd in the worst case upload them twice.
223        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
224
225
226    def _get_hqes_for_jobs(self, jobs):
227        hqes = []
228        for job in jobs:
229            hqes.extend(job.hostqueueentry_set.all())
230        return hqes
231
232
233    def _get_known_jobs_and_hosts(self):
234        """Returns lists of host and job info to send in a heartbeat.
235
236        The host and job ids are ids of objects that are already present on the
237        shard and therefore don't need to be sent again.
238
239        For jobs, only incomplete jobs are sent, as the master won't send
240        already completed jobs anyway. This helps keeping the list of id's
241        considerably small.
242
243        For hosts, host status in addition to host id are sent to master
244        to sync the host status.
245
246        @returns: Tuple of three lists. The first one contains job ids, the
247                  second one host ids, and the third one host statuses.
248        """
249        job_ids = list(models.Job.objects.filter(
250                hostqueueentry__complete=False).values_list('id', flat=True))
251        host_models = models.Host.objects.filter(invalid=0)
252        host_ids = []
253        host_statuses = []
254        for h in host_models:
255            host_ids.append(h.id)
256            host_statuses.append(h.status)
257        return job_ids, host_ids, host_statuses
258
259
260    def _heartbeat_packet(self):
261        """Construct the heartbeat packet.
262
263        See site_rpc_interface for a more detailed description of the heartbeat.
264
265        @return: A heartbeat packet.
266        """
267        known_job_ids, known_host_ids, known_host_statuses = (
268                self._get_known_jobs_and_hosts())
269        logging.info('Known jobs: %s', known_job_ids)
270
271        job_objs = self._get_jobs_to_upload()
272        hqes = [hqe.serialize(include_dependencies=False)
273                for hqe in self._get_hqes_for_jobs(job_objs)]
274        jobs = [job.serialize(include_dependencies=False) for job in job_objs]
275        logging.info('Uploading jobs %s', [j['id'] for j in jobs])
276
277        return {'shard_hostname': self.hostname,
278                'known_job_ids': known_job_ids,
279                'known_host_ids': known_host_ids,
280                'known_host_statuses': known_host_statuses,
281                'jobs': jobs, 'hqes': hqes}
282
283
284    def _heartbeat_failure(self, log_message):
285        logging.error("Heartbeat failed. %s", log_message)
286        autotest_stats.Counter(STATS_KEY).increment('heartbeat_failures')
287
288
289    @timer.decorate
290    def do_heartbeat(self):
291        """Perform a heartbeat: Retreive new jobs.
292
293        This function executes a `shard_heartbeat` RPC. It retrieves the
294        response of this call and processes the response by storing the returned
295        objects in the local database.
296        """
297        logging.info("Performing heartbeat.")
298        packet = self._heartbeat_packet()
299        autotest_stats.Gauge(STATS_KEY).send(
300                'heartbeat.request_size', len(str(packet)))
301
302        try:
303            response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
304        except urllib2.HTTPError as e:
305            self._heartbeat_failure("HTTPError %d: %s" % (e.code, e.reason))
306            return
307        except urllib2.URLError as e:
308            self._heartbeat_failure("URLError: %s" % e.reason)
309            return
310        except httplib.HTTPException as e:
311            self._heartbeat_failure("HTTPException: %s" % e)
312            return
313        except timeout_util.TimeoutError as e:
314            self._heartbeat_failure("TimeoutError: %s" % e)
315            return
316
317        autotest_stats.Gauge(STATS_KEY).send(
318                'heartbeat.response_size', len(str(response)))
319        self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
320        self.process_heartbeat_response(response)
321        logging.info("Heartbeat completed.")
322
323
324    def tick(self):
325        """Performs all tasks the shard clients needs to do periodically."""
326        self.do_heartbeat()
327
328
329    def loop(self):
330        """Calls tick() until shutdown() is called."""
331        while not self._shutdown:
332            self.tick()
333            time.sleep(self.tick_pause_sec)
334
335
336    def shutdown(self):
337        """Stops the shard client after the current tick."""
338        logging.info("Shutdown request received.")
339        self._shutdown = True
340
341
342def handle_signal(signum, frame):
343    """Sigint handler so we don't crash mid-tick."""
344    _heartbeat_client.shutdown()
345
346
347def _get_shard_hostname_and_ensure_running_on_shard():
348    """Read the hostname the local shard from the global configuration.
349
350    Raise an exception if run from elsewhere than a shard.
351
352    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
353            elsewhere than from a shard.
354    """
355    hostname = global_config.global_config.get_config_value(
356        'SHARD', 'shard_hostname', default=None)
357    if not hostname:
358        raise error.HeartbeatOnlyAllowedInShardModeException(
359            'To run the shard client, shard_hostname must neither be None nor '
360            'empty.')
361    return hostname
362
363
364def _get_tick_pause_sec():
365    """Read pause to make between two ticks from the global configuration."""
366    return global_config.global_config.get_config_value(
367        'SHARD', 'heartbeat_pause_sec', type=float)
368
369
370def get_shard_client():
371    """Instantiate a shard client instance.
372
373    Configuration values will be read from the global configuration.
374
375    @returns A shard client instance.
376    """
377    global_afe_hostname = server_utils.get_global_afe_hostname()
378    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
379    tick_pause_sec = _get_tick_pause_sec()
380    return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
381
382
383def main():
384    try:
385        autotest_stats.Counter(STATS_KEY).increment('starts')
386        main_without_exception_handling()
387    except Exception as e:
388        message = 'Uncaught exception. Terminating shard_client.'
389        email_manager.manager.log_stacktrace(message)
390        logging.exception(message)
391        autotest_stats.Counter(STATS_KEY).increment('uncaught_exceptions')
392        raise
393    finally:
394        email_manager.manager.send_queued_emails()
395
396
397def main_without_exception_handling():
398    parser = argparse.ArgumentParser(description='Shard client.')
399    options = parser.parse_args()
400
401    scheduler_lib.setup_logging(
402            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
403            None, timestamped_logfile_prefix='shard_client')
404
405    logging.info("Setting signal handler.")
406    signal.signal(signal.SIGINT, handle_signal)
407    signal.signal(signal.SIGTERM, handle_signal)
408
409    logging.info("Starting shard client.")
410    global _heartbeat_client
411    _heartbeat_client = get_shard_client()
412    _heartbeat_client.loop()
413
414
415if __name__ == '__main__':
416    main()
417