1# pylint: disable=missing-docstring
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import ast
35import datetime
36import logging
37import os
38import sys
39
40from django.db.models import Count
41
42import common
43# TODO(akeshet): Replace with monarch stats once we know how to instrument rpc
44# server with ts_mon.
45from autotest_lib.client.common_lib.cros.graphite import autotest_stats
46from autotest_lib.client.common_lib import control_data
47from autotest_lib.client.common_lib import error
48from autotest_lib.client.common_lib import global_config
49from autotest_lib.client.common_lib import priorities
50from autotest_lib.client.common_lib import time_utils
51from autotest_lib.client.common_lib.cros import dev_server
52from autotest_lib.frontend.afe import control_file as control_file_lib
53from autotest_lib.frontend.afe import model_attributes
54from autotest_lib.frontend.afe import model_logic
55from autotest_lib.frontend.afe import models
56from autotest_lib.frontend.afe import rpc_utils
57from autotest_lib.frontend.tko import models as tko_models
58from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
59from autotest_lib.server import frontend
60from autotest_lib.server import utils
61from autotest_lib.server.cros import provision
62from autotest_lib.server.cros.dynamic_suite import constants
63from autotest_lib.server.cros.dynamic_suite import control_file_getter
64from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
65from autotest_lib.server.cros.dynamic_suite import tools
66from autotest_lib.server.cros.dynamic_suite.suite import Suite
67from autotest_lib.server.lib import status_history
68from autotest_lib.site_utils import host_history
69from autotest_lib.site_utils import job_history
70from autotest_lib.site_utils import server_manager_utils
71from autotest_lib.site_utils import stable_version_utils
72
73
74_CONFIG = global_config.global_config
75
76# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
77
78# labels
79
80def modify_label(id, **data):
81    """Modify a label.
82
83    @param id: id or name of a label. More often a label name.
84    @param data: New data for a label.
85    """
86    label_model = models.Label.smart_get(id)
87    label_model.update_object(data)
88
89    # Master forwards the RPC to shards
90    if not utils.is_shard():
91        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
92                             id=id, **data)
93
94
95def delete_label(id):
96    """Delete a label.
97
98    @param id: id or name of a label. More often a label name.
99    """
100    label_model = models.Label.smart_get(id)
101    # Hosts that have the label to be deleted. Save this info before
102    # the label is deleted to use it later.
103    hosts = []
104    for h in label_model.host_set.all():
105        hosts.append(models.Host.smart_get(h.id))
106    label_model.delete()
107
108    # Master forwards the RPC to shards
109    if not utils.is_shard():
110        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
111
112
113def add_label(name, ignore_exception_if_exists=False, **kwargs):
114    """Adds a new label of a given name.
115
116    @param name: label name.
117    @param ignore_exception_if_exists: If True and the exception was
118        thrown due to the duplicated label name when adding a label,
119        then suppress the exception. Default is False.
120    @param kwargs: keyword args that store more info about a label
121        other than the name.
122    @return: int/long id of a new label.
123    """
124    # models.Label.add_object() throws model_logic.ValidationError
125    # when it is given a label name that already exists.
126    # However, ValidationError can be thrown with different errors,
127    # and those errors should be thrown up to the call chain.
128    try:
129        label = models.Label.add_object(name=name, **kwargs)
130    except:
131        exc_info = sys.exc_info()
132        if ignore_exception_if_exists:
133            label = rpc_utils.get_label(name)
134            # If the exception is raised not because of duplicated
135            # "name", then raise the original exception.
136            if label is None:
137                raise exc_info[0], exc_info[1], exc_info[2]
138        else:
139            raise exc_info[0], exc_info[1], exc_info[2]
140    return label.id
141
142
143def add_label_to_hosts(id, hosts):
144    """Adds a label of the given id to the given hosts only in local DB.
145
146    @param id: id or name of a label. More often a label name.
147    @param hosts: The hostnames of hosts that need the label.
148
149    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
150    """
151    label = models.Label.smart_get(id)
152    host_objs = models.Host.smart_get_bulk(hosts)
153    if label.platform:
154        models.Host.check_no_platform(host_objs)
155    # Ensure a host has no more than one board label with it.
156    if label.name.startswith('board:'):
157        models.Host.check_board_labels_allowed(host_objs, [label.name])
158    label.host_set.add(*host_objs)
159
160
161def _create_label_everywhere(id, hosts):
162    """
163    Yet another method to create labels.
164
165    ALERT! This method should be run only on master not shards!
166    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
167
168    This method exists primarily to serve label_add_hosts() and
169    host_add_labels().  Basically it pulls out the label check/add logic
170    from label_add_hosts() into this nice method that not only creates
171    the label but also tells the shards that service the hosts to also
172    create the label.
173
174    @param id: id or name of a label. More often a label name.
175    @param hosts: A list of hostnames or ids. More often hostnames.
176    """
177    try:
178        label = models.Label.smart_get(id)
179    except models.Label.DoesNotExist:
180        # This matches the type checks in smart_get, which is a hack
181        # in and off itself. The aim here is to create any non-existent
182        # label, which we cannot do if the 'id' specified isn't a label name.
183        if isinstance(id, basestring):
184            label = models.Label.smart_get(add_label(id))
185        else:
186            raise ValueError('Label id (%s) does not exist. Please specify '
187                             'the argument, id, as a string (label name).'
188                             % id)
189
190    # Make sure the label exists on the shard with the same id
191    # as it is on the master.
192    # It is possible that the label is already in a shard because
193    # we are adding a new label only to shards of hosts that the label
194    # is going to be attached.
195    # For example, we add a label L1 to a host in shard S1.
196    # Master and S1 will have L1 but other shards won't.
197    # Later, when we add the same label L1 to hosts in shards S1 and S2,
198    # S1 already has the label but S2 doesn't.
199    # S2 should have the new label without any problem.
200    # We ignore exception in such a case.
201    host_objs = models.Host.smart_get_bulk(hosts)
202    rpc_utils.fanout_rpc(
203            host_objs, 'add_label', include_hostnames=False,
204            name=label.name, ignore_exception_if_exists=True,
205            id=label.id, platform=label.platform)
206
207
208@rpc_utils.route_rpc_to_master
209def label_add_hosts(id, hosts):
210    """Adds a label with the given id to the given hosts.
211
212    This method should be run only on master not shards.
213    The given label will be created if it doesn't exist, provided the `id`
214    supplied is a label name not an int/long id.
215
216    @param id: id or name of a label. More often a label name.
217    @param hosts: A list of hostnames or ids. More often hostnames.
218
219    @raises ValueError: If the id specified is an int/long (label id)
220                        while the label does not exist.
221    """
222    # Create the label.
223    _create_label_everywhere(id, hosts)
224
225    # Add it to the master.
226    add_label_to_hosts(id, hosts)
227
228    # Add it to the shards.
229    host_objs = models.Host.smart_get_bulk(hosts)
230    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
231
232
233def remove_label_from_hosts(id, hosts):
234    """Removes a label of the given id from the given hosts only in local DB.
235
236    @param id: id or name of a label.
237    @param hosts: The hostnames of hosts that need to remove the label from.
238    """
239    host_objs = models.Host.smart_get_bulk(hosts)
240    models.Label.smart_get(id).host_set.remove(*host_objs)
241
242
243@rpc_utils.route_rpc_to_master
244def label_remove_hosts(id, hosts):
245    """Removes a label of the given id from the given hosts.
246
247    This method should be run only on master not shards.
248
249    @param id: id or name of a label.
250    @param hosts: A list of hostnames or ids. More often hostnames.
251    """
252    host_objs = models.Host.smart_get_bulk(hosts)
253    remove_label_from_hosts(id, hosts)
254
255    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
256
257
258def get_labels(exclude_filters=(), **filter_data):
259    """\
260    @param exclude_filters: A sequence of dictionaries of filters.
261
262    @returns A sequence of nested dictionaries of label information.
263    """
264    labels = models.Label.query_objects(filter_data)
265    for exclude_filter in exclude_filters:
266        labels = labels.exclude(**exclude_filter)
267    return rpc_utils.prepare_rows_as_nested_dicts(labels, ())
268
269
270# hosts
271
272def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
273    if locked and not lock_reason:
274        raise model_logic.ValidationError(
275            {'locked': 'Please provide a reason for locking when adding host.'})
276
277    return models.Host.add_object(hostname=hostname, status=status,
278                                  locked=locked, lock_reason=lock_reason,
279                                  protection=protection).id
280
281
282@rpc_utils.route_rpc_to_master
283def modify_host(id, **kwargs):
284    """Modify local attributes of a host.
285
286    If this is called on the master, but the host is assigned to a shard, this
287    will call `modify_host_local` RPC to the responsible shard. This means if
288    a host is being locked using this function, this change will also propagate
289    to shards.
290    When this is called on a shard, the shard just routes the RPC to the master
291    and does nothing.
292
293    @param id: id of the host to modify.
294    @param kwargs: key=value pairs of values to set on the host.
295    """
296    rpc_utils.check_modify_host(kwargs)
297    host = models.Host.smart_get(id)
298    try:
299        rpc_utils.check_modify_host_locking(host, kwargs)
300    except model_logic.ValidationError as e:
301        if not kwargs.get('force_modify_locking', False):
302            raise
303        logging.exception('The following exception will be ignored and lock '
304                          'modification will be enforced. %s', e)
305
306    # This is required to make `lock_time` for a host be exactly same
307    # between the master and a shard.
308    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
309        kwargs['lock_time'] = datetime.datetime.now()
310    host.update_object(kwargs)
311
312    # force_modifying_locking is not an internal field in database, remove.
313    kwargs.pop('force_modify_locking', None)
314    rpc_utils.fanout_rpc([host], 'modify_host_local',
315                         include_hostnames=False, id=id, **kwargs)
316
317
318def modify_host_local(id, **kwargs):
319    """Modify host attributes in local DB.
320
321    @param id: Host id.
322    @param kwargs: key=value pairs of values to set on the host.
323    """
324    models.Host.smart_get(id).update_object(kwargs)
325
326
327@rpc_utils.route_rpc_to_master
328def modify_hosts(host_filter_data, update_data):
329    """Modify local attributes of multiple hosts.
330
331    If this is called on the master, but one of the hosts in that match the
332    filters is assigned to a shard, this will call `modify_hosts_local` RPC
333    to the responsible shard.
334    When this is called on a shard, the shard just routes the RPC to the master
335    and does nothing.
336
337    The filters are always applied on the master, not on the shards. This means
338    if the states of a host differ on the master and a shard, the state on the
339    master will be used. I.e. this means:
340    A host was synced to Shard 1. On Shard 1 the status of the host was set to
341    'Repair Failed'.
342    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
343    update the host (both on the shard and on the master), because the state
344    of the host as the master knows it is still 'Ready'.
345    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
346    will not update the host, because the filter doesn't apply on the master.
347
348    @param host_filter_data: Filters out which hosts to modify.
349    @param update_data: A dictionary with the changes to make to the hosts.
350    """
351    update_data = update_data.copy()
352    rpc_utils.check_modify_host(update_data)
353    hosts = models.Host.query_objects(host_filter_data)
354
355    affected_shard_hostnames = set()
356    affected_host_ids = []
357
358    # Check all hosts before changing data for exception safety.
359    for host in hosts:
360        try:
361            rpc_utils.check_modify_host_locking(host, update_data)
362        except model_logic.ValidationError as e:
363            if not update_data.get('force_modify_locking', False):
364                raise
365            logging.exception('The following exception will be ignored and '
366                              'lock modification will be enforced. %s', e)
367
368        if host.shard:
369            affected_shard_hostnames.add(host.shard.rpc_hostname())
370            affected_host_ids.append(host.id)
371
372    # This is required to make `lock_time` for a host be exactly same
373    # between the master and a shard.
374    if update_data.get('locked', None) and 'lock_time' not in update_data:
375        update_data['lock_time'] = datetime.datetime.now()
376    for host in hosts:
377        host.update_object(update_data)
378
379    update_data.pop('force_modify_locking', None)
380    # Caution: Changing the filter from the original here. See docstring.
381    rpc_utils.run_rpc_on_multiple_hostnames(
382            'modify_hosts_local', affected_shard_hostnames,
383            host_filter_data={'id__in': affected_host_ids},
384            update_data=update_data)
385
386
387def modify_hosts_local(host_filter_data, update_data):
388    """Modify attributes of hosts in local DB.
389
390    @param host_filter_data: Filters out which hosts to modify.
391    @param update_data: A dictionary with the changes to make to the hosts.
392    """
393    for host in models.Host.query_objects(host_filter_data):
394        host.update_object(update_data)
395
396
397def add_labels_to_host(id, labels):
398    """Adds labels to a given host only in local DB.
399
400    @param id: id or hostname for a host.
401    @param labels: ids or names for labels.
402    """
403    label_objs = models.Label.smart_get_bulk(labels)
404    models.Host.smart_get(id).labels.add(*label_objs)
405
406
407@rpc_utils.route_rpc_to_master
408def host_add_labels(id, labels):
409    """Adds labels to a given host.
410
411    @param id: id or hostname for a host.
412    @param labels: ids or names for labels.
413
414    @raises ValidationError: If adding more than one platform/board label.
415    """
416    # Create the labels on the master/shards.
417    for label in labels:
418        _create_label_everywhere(label, [id])
419
420    label_objs = models.Label.smart_get_bulk(labels)
421    platforms = [label.name for label in label_objs if label.platform]
422    boards = [label.name for label in label_objs
423              if label.name.startswith('board:')]
424    if len(platforms) > 1 or not utils.board_labels_allowed(boards):
425        raise model_logic.ValidationError(
426            {'labels': ('Adding more than one platform label, or a list of '
427                        'non-compatible board labels.: %s %s' %
428                        (', '.join(platforms), ', '.join(boards)))})
429
430    host_obj = models.Host.smart_get(id)
431    if platforms:
432        models.Host.check_no_platform([host_obj])
433    if boards:
434        models.Host.check_board_labels_allowed([host_obj], labels)
435    add_labels_to_host(id, labels)
436
437    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
438                         id=id, labels=labels)
439
440
441def remove_labels_from_host(id, labels):
442    """Removes labels from a given host only in local DB.
443
444    @param id: id or hostname for a host.
445    @param labels: ids or names for labels.
446    """
447    label_objs = models.Label.smart_get_bulk(labels)
448    models.Host.smart_get(id).labels.remove(*label_objs)
449
450
451@rpc_utils.route_rpc_to_master
452def host_remove_labels(id, labels):
453    """Removes labels from a given host.
454
455    @param id: id or hostname for a host.
456    @param labels: ids or names for labels.
457    """
458    remove_labels_from_host(id, labels)
459
460    host_obj = models.Host.smart_get(id)
461    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
462                         id=id, labels=labels)
463
464
465def get_host_attribute(attribute, **host_filter_data):
466    """
467    @param attribute: string name of attribute
468    @param host_filter_data: filter data to apply to Hosts to choose hosts to
469                             act upon
470    """
471    hosts = rpc_utils.get_host_query((), False, True, host_filter_data)
472    hosts = list(hosts)
473    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
474                                               'attribute_list')
475    host_attr_dicts = []
476    for host_obj in hosts:
477        for attr_obj in host_obj.attribute_list:
478            if attr_obj.attribute == attribute:
479                host_attr_dicts.append(attr_obj.get_object_dict())
480    return rpc_utils.prepare_for_serialization(host_attr_dicts)
481
482
483def set_host_attribute(attribute, value, **host_filter_data):
484    """
485    @param attribute: string name of attribute
486    @param value: string, or None to delete an attribute
487    @param host_filter_data: filter data to apply to Hosts to choose hosts to
488                             act upon
489    """
490    assert host_filter_data # disallow accidental actions on all hosts
491    hosts = models.Host.query_objects(host_filter_data)
492    models.AclGroup.check_for_acl_violation_hosts(hosts)
493    for host in hosts:
494        host.set_or_delete_attribute(attribute, value)
495
496    # Master forwards this RPC to shards.
497    if not utils.is_shard():
498        rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
499                attribute=attribute, value=value, **host_filter_data)
500
501
502@rpc_utils.forward_single_host_rpc_to_shard
503def delete_host(id):
504    models.Host.smart_get(id).delete()
505
506
507def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
508              valid_only=True, include_current_job=False, **filter_data):
509    """Get a list of dictionaries which contains the information of hosts.
510
511    @param multiple_labels: match hosts in all of the labels given.  Should
512            be a list of label names.
513    @param exclude_only_if_needed_labels: Exclude hosts with at least one
514            "only_if_needed" label applied.
515    @param include_current_job: Set to True to include ids of currently running
516            job and special task.
517    """
518    hosts = rpc_utils.get_host_query(multiple_labels,
519                                     exclude_only_if_needed_labels,
520                                     valid_only, filter_data)
521    hosts = list(hosts)
522    models.Host.objects.populate_relationships(hosts, models.Label,
523                                               'label_list')
524    models.Host.objects.populate_relationships(hosts, models.AclGroup,
525                                               'acl_list')
526    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
527                                               'attribute_list')
528    host_dicts = []
529    for host_obj in hosts:
530        host_dict = host_obj.get_object_dict()
531        host_dict['labels'] = [label.name for label in host_obj.label_list]
532        host_dict['platform'] = rpc_utils.find_platform(host_obj)
533        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
534        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
535                                       for attribute in host_obj.attribute_list)
536        if include_current_job:
537            host_dict['current_job'] = None
538            host_dict['current_special_task'] = None
539            entries = models.HostQueueEntry.objects.filter(
540                    host_id=host_dict['id'], active=True, complete=False)
541            if entries:
542                host_dict['current_job'] = (
543                        entries[0].get_object_dict()['job'])
544            tasks = models.SpecialTask.objects.filter(
545                    host_id=host_dict['id'], is_active=True, is_complete=False)
546            if tasks:
547                host_dict['current_special_task'] = (
548                        '%d-%s' % (tasks[0].get_object_dict()['id'],
549                                   tasks[0].get_object_dict()['task'].lower()))
550        host_dicts.append(host_dict)
551    return rpc_utils.prepare_for_serialization(host_dicts)
552
553
554def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
555                  valid_only=True, **filter_data):
556    """
557    Same parameters as get_hosts().
558
559    @returns The number of matching hosts.
560    """
561    hosts = rpc_utils.get_host_query(multiple_labels,
562                                     exclude_only_if_needed_labels,
563                                     valid_only, filter_data)
564    return hosts.count()
565
566
567# tests
568
569def get_tests(**filter_data):
570    return rpc_utils.prepare_for_serialization(
571        models.Test.list_objects(filter_data))
572
573
574def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
575    """Gets the counts of all passed and failed tests from the matching jobs.
576
577    @param job_name_prefix: Name prefix of the jobs to get the summary
578           from, e.g., 'butterfly-release/R40-6457.21.0/bvt-cq/'.
579    @param label_name: Label that must be set in the jobs, e.g.,
580            'cros-version:butterfly-release/R40-6457.21.0'.
581
582    @returns A summary of the counts of all the passed and failed tests.
583    """
584    job_ids = list(models.Job.objects.filter(
585            name__startswith=job_name_prefix,
586            dependency_labels__name=label_name).values_list(
587                'pk', flat=True))
588    summary = {'passed': 0, 'failed': 0}
589    if not job_ids:
590        return summary
591
592    counts = (tko_models.TestView.objects.filter(
593            afe_job_id__in=job_ids).exclude(
594                test_name='SERVER_JOB').exclude(
595                    test_name__startswith='CLIENT_JOB').values(
596                        'status').annotate(
597                            count=Count('status')))
598    for status in counts:
599        if status['status'] == 'GOOD':
600            summary['passed'] += status['count']
601        else:
602            summary['failed'] += status['count']
603    return summary
604
605
606# profilers
607
608def add_profiler(name, description=None):
609    return models.Profiler.add_object(name=name, description=description).id
610
611
612def modify_profiler(id, **data):
613    models.Profiler.smart_get(id).update_object(data)
614
615
616def delete_profiler(id):
617    models.Profiler.smart_get(id).delete()
618
619
620def get_profilers(**filter_data):
621    return rpc_utils.prepare_for_serialization(
622        models.Profiler.list_objects(filter_data))
623
624
625# users
626
627def get_users(**filter_data):
628    return rpc_utils.prepare_for_serialization(
629        models.User.list_objects(filter_data))
630
631
632# acl groups
633
634def add_acl_group(name, description=None):
635    group = models.AclGroup.add_object(name=name, description=description)
636    group.users.add(models.User.current_user())
637    return group.id
638
639
640def modify_acl_group(id, **data):
641    group = models.AclGroup.smart_get(id)
642    group.check_for_acl_violation_acl_group()
643    group.update_object(data)
644    group.add_current_user_if_empty()
645
646
647def acl_group_add_users(id, users):
648    group = models.AclGroup.smart_get(id)
649    group.check_for_acl_violation_acl_group()
650    users = models.User.smart_get_bulk(users)
651    group.users.add(*users)
652
653
654def acl_group_remove_users(id, users):
655    group = models.AclGroup.smart_get(id)
656    group.check_for_acl_violation_acl_group()
657    users = models.User.smart_get_bulk(users)
658    group.users.remove(*users)
659    group.add_current_user_if_empty()
660
661
662def acl_group_add_hosts(id, hosts):
663    group = models.AclGroup.smart_get(id)
664    group.check_for_acl_violation_acl_group()
665    hosts = models.Host.smart_get_bulk(hosts)
666    group.hosts.add(*hosts)
667    group.on_host_membership_change()
668
669
670def acl_group_remove_hosts(id, hosts):
671    group = models.AclGroup.smart_get(id)
672    group.check_for_acl_violation_acl_group()
673    hosts = models.Host.smart_get_bulk(hosts)
674    group.hosts.remove(*hosts)
675    group.on_host_membership_change()
676
677
678def delete_acl_group(id):
679    models.AclGroup.smart_get(id).delete()
680
681
682def get_acl_groups(**filter_data):
683    acl_groups = models.AclGroup.list_objects(filter_data)
684    for acl_group in acl_groups:
685        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
686        acl_group['users'] = [user.login
687                              for user in acl_group_obj.users.all()]
688        acl_group['hosts'] = [host.hostname
689                              for host in acl_group_obj.hosts.all()]
690    return rpc_utils.prepare_for_serialization(acl_groups)
691
692
693# jobs
694
695def generate_control_file(tests=(), profilers=(),
696                          client_control_file='', use_container=False,
697                          profile_only=None, db_tests=True,
698                          test_source_build=None):
699    """
700    Generates a client-side control file to run tests.
701
702    @param tests List of tests to run. See db_tests for more information.
703    @param profilers List of profilers to activate during the job.
704    @param client_control_file The contents of a client-side control file to
705        run at the end of all tests.  If this is supplied, all tests must be
706        client side.
707        TODO: in the future we should support server control files directly
708        to wrap with a kernel.  That'll require changing the parameter
709        name and adding a boolean to indicate if it is a client or server
710        control file.
711    @param use_container unused argument today.  TODO: Enable containers
712        on the host during a client side test.
713    @param profile_only A boolean that indicates what default profile_only
714        mode to use in the control file. Passing None will generate a
715        control file that does not explcitly set the default mode at all.
716    @param db_tests: if True, the test object can be found in the database
717                     backing the test model. In this case, tests is a tuple
718                     of test IDs which are used to retrieve the test objects
719                     from the database. If False, tests is a tuple of test
720                     dictionaries stored client-side in the AFE.
721    @param test_source_build: Build to be used to retrieve test code. Default
722                              to None.
723
724    @returns a dict with the following keys:
725        control_file: str, The control file text.
726        is_server: bool, is the control file a server-side control file?
727        synch_count: How many machines the job uses per autoserv execution.
728            synch_count == 1 means the job is asynchronous.
729        dependencies: A list of the names of labels on which the job depends.
730    """
731    if not tests and not client_control_file:
732        return dict(control_file='', is_server=False, synch_count=1,
733                    dependencies=[])
734
735    cf_info, test_objects, profiler_objects = (
736        rpc_utils.prepare_generate_control_file(tests, profilers,
737                                                db_tests))
738    cf_info['control_file'] = control_file_lib.generate_control(
739        tests=test_objects, profilers=profiler_objects,
740        is_server=cf_info['is_server'],
741        client_control_file=client_control_file, profile_only=profile_only,
742        test_source_build=test_source_build)
743    return cf_info
744
745
746def create_job_page_handler(name, priority, control_file, control_type,
747                            image=None, hostless=False, firmware_rw_build=None,
748                            firmware_ro_build=None, test_source_build=None,
749                            is_cloning=False, **kwargs):
750    """\
751    Create and enqueue a job.
752
753    @param name name of this job
754    @param priority Integer priority of this job.  Higher is more important.
755    @param control_file String contents of the control file.
756    @param control_type Type of control file, Client or Server.
757    @param image: ChromeOS build to be installed in the dut. Default to None.
758    @param firmware_rw_build: Firmware build to update RW firmware. Default to
759                              None, i.e., RW firmware will not be updated.
760    @param firmware_ro_build: Firmware build to update RO firmware. Default to
761                              None, i.e., RO firmware will not be updated.
762    @param test_source_build: Build to be used to retrieve test code. Default
763                              to None.
764    @param is_cloning: True if creating a cloning job.
765    @param kwargs extra args that will be required by create_suite_job or
766                  create_job.
767
768    @returns The created Job id number.
769    """
770    if is_cloning:
771        logging.info('Start to clone a new job')
772        # When cloning a job, hosts and meta_hosts should not exist together,
773        # which would cause host-scheduler to schedule two hqe jobs to one host
774        # at the same time, and crash itself. Clear meta_hosts for this case.
775        if kwargs.get('hosts') and kwargs.get('meta_hosts'):
776            kwargs['meta_hosts'] = []
777    else:
778        logging.info('Start to create a new job')
779    control_file = rpc_utils.encode_ascii(control_file)
780    if not control_file:
781        raise model_logic.ValidationError({
782                'control_file' : "Control file cannot be empty"})
783
784    if image and hostless:
785        builds = {}
786        builds[provision.CROS_VERSION_PREFIX] = image
787        if firmware_rw_build:
788            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
789        if firmware_ro_build:
790            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
791        return create_suite_job(
792                name=name, control_file=control_file, priority=priority,
793                builds=builds, test_source_build=test_source_build,
794                is_cloning=is_cloning, **kwargs)
795    return create_job(name, priority, control_file, control_type, image=image,
796                      hostless=hostless, **kwargs)
797
798
799@rpc_utils.route_rpc_to_master
800def create_job(
801        name,
802        priority,
803        control_file,
804        control_type,
805        hosts=(),
806        meta_hosts=(),
807        one_time_hosts=(),
808        synch_count=None,
809        is_template=False,
810        timeout=None,
811        timeout_mins=None,
812        max_runtime_mins=None,
813        run_verify=False,
814        email_list='',
815        dependencies=(),
816        reboot_before=None,
817        reboot_after=None,
818        parse_failed_repair=None,
819        hostless=False,
820        keyvals=None,
821        drone_set=None,
822        image=None,
823        parent_job_id=None,
824        test_retry=0,
825        run_reset=True,
826        require_ssp=None,
827        args=(),
828        **kwargs):
829    """\
830    Create and enqueue a job.
831
832    @param name name of this job
833    @param priority Integer priority of this job.  Higher is more important.
834    @param control_file String contents of the control file.
835    @param control_type Type of control file, Client or Server.
836    @param synch_count How many machines the job uses per autoserv execution.
837        synch_count == 1 means the job is asynchronous.  If an atomic group is
838        given this value is treated as a minimum.
839    @param is_template If true then create a template job.
840    @param timeout Hours after this call returns until the job times out.
841    @param timeout_mins Minutes after this call returns until the job times
842        out.
843    @param max_runtime_mins Minutes from job starting time until job times out
844    @param run_verify Should the host be verified before running the test?
845    @param email_list String containing emails to mail when the job is done
846    @param dependencies List of label names on which this job depends
847    @param reboot_before Never, If dirty, or Always
848    @param reboot_after Never, If all tests passed, or Always
849    @param parse_failed_repair if true, results of failed repairs launched by
850        this job will be parsed as part of the job.
851    @param hostless if true, create a hostless job
852    @param keyvals dict of keyvals to associate with the job
853    @param hosts List of hosts to run job on.
854    @param meta_hosts List where each entry is a label name, and for each entry
855        one host will be chosen from that label to run the job on.
856    @param one_time_hosts List of hosts not in the database to run the job on.
857    @param drone_set The name of the drone set to run this test on.
858    @param image OS image to install before running job.
859    @param parent_job_id id of a job considered to be parent of created job.
860    @param test_retry Number of times to retry test if the test did not
861        complete successfully. (optional, default: 0)
862    @param run_reset Should the host be reset before running the test?
863    @param require_ssp Set to True to require server-side packaging to run the
864                       test. If it's set to None, drone will still try to run
865                       the server side with server-side packaging. If the
866                       autotest-server package doesn't exist for the build or
867                       image is not set, drone will run the test without server-
868                       side packaging. Default is None.
869    @param args A list of args to be injected into control file.
870    @param kwargs extra keyword args. NOT USED.
871
872    @returns The created Job id number.
873    """
874    if args:
875        control_file = tools.inject_vars({'args': args}, control_file)
876    if image:
877        dependencies += (provision.image_version_to_label(image),)
878    return rpc_utils.create_job_common(
879            name=name,
880            priority=priority,
881            control_type=control_type,
882            control_file=control_file,
883            hosts=hosts,
884            meta_hosts=meta_hosts,
885            one_time_hosts=one_time_hosts,
886            synch_count=synch_count,
887            is_template=is_template,
888            timeout=timeout,
889            timeout_mins=timeout_mins,
890            max_runtime_mins=max_runtime_mins,
891            run_verify=run_verify,
892            email_list=email_list,
893            dependencies=dependencies,
894            reboot_before=reboot_before,
895            reboot_after=reboot_after,
896            parse_failed_repair=parse_failed_repair,
897            hostless=hostless,
898            keyvals=keyvals,
899            drone_set=drone_set,
900            parent_job_id=parent_job_id,
901            test_retry=test_retry,
902            run_reset=run_reset,
903            require_ssp=require_ssp)
904
905
906def abort_host_queue_entries(**filter_data):
907    """\
908    Abort a set of host queue entries.
909
910    @return: A list of dictionaries, each contains information
911             about an aborted HQE.
912    """
913    query = models.HostQueueEntry.query_objects(filter_data)
914
915    # Dont allow aborts on:
916    #   1. Jobs that have already completed (whether or not they were aborted)
917    #   2. Jobs that we have already been aborted (but may not have completed)
918    query = query.filter(complete=False).filter(aborted=False)
919    models.AclGroup.check_abort_permissions(query)
920    host_queue_entries = list(query.select_related())
921    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
922
923    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
924    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
925                 'Job name': hqe.job.name} for hqe in host_queue_entries]
926    return hqe_info
927
928
929def abort_special_tasks(**filter_data):
930    """\
931    Abort the special task, or tasks, specified in the filter.
932    """
933    query = models.SpecialTask.query_objects(filter_data)
934    special_tasks = query.filter(is_active=True)
935    for task in special_tasks:
936        task.abort()
937
938
939def _call_special_tasks_on_hosts(task, hosts):
940    """\
941    Schedules a set of hosts for a special task.
942
943    @returns A list of hostnames that a special task was created for.
944    """
945    models.AclGroup.check_for_acl_violation_hosts(hosts)
946    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
947    if shard_host_map and not utils.is_shard():
948        raise ValueError('The following hosts are on shards, please '
949                         'follow the link to the shards and create jobs '
950                         'there instead. %s.' % shard_host_map)
951    for host in hosts:
952        models.SpecialTask.schedule_special_task(host, task)
953    return list(sorted(host.hostname for host in hosts))
954
955
956def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
957    """Forward special tasks to corresponding shards.
958
959    For master, when special tasks are fired on hosts that are sharded,
960    forward the RPC to corresponding shards.
961
962    For shard, create special task records in local DB.
963
964    @param task: Enum value of frontend.afe.models.SpecialTask.Task
965    @param rpc: RPC name to forward.
966    @param filter_data: Filter keywords to be used for DB query.
967
968    @return: A list of hostnames that a special task was created for.
969    """
970    hosts = models.Host.query_objects(filter_data)
971    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
972
973    # Filter out hosts on a shard from those on the master, forward
974    # rpcs to the shard with an additional hostname__in filter, and
975    # create a local SpecialTask for each remaining host.
976    if shard_host_map and not utils.is_shard():
977        hosts = [h for h in hosts if h.shard is None]
978        for shard, hostnames in shard_host_map.iteritems():
979
980            # The main client of this module is the frontend website, and
981            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
982            # the 'hostname' filter should narrow down the list of hosts on
983            # each shard even though we supply all the ids in filter_data.
984            # This method uses hostname instead of id because it fits better
985            # with the overall architecture of redirection functions in
986            # rpc_utils.
987            shard_filter = filter_data.copy()
988            shard_filter['hostname__in'] = hostnames
989            rpc_utils.run_rpc_on_multiple_hostnames(
990                    rpc, [shard], **shard_filter)
991
992    # There is a race condition here if someone assigns a shard to one of these
993    # hosts before we create the task. The host will stay on the master if:
994    # 1. The host is not Ready
995    # 2. The host is Ready but has a task
996    # But if the host is Ready and doesn't have a task yet, it will get sent
997    # to the shard as we're creating a task here.
998
999    # Given that we only rarely verify Ready hosts it isn't worth putting this
1000    # entire method in a transaction. The worst case scenario is that we have
1001    # a verify running on a Ready host while the shard is using it, if the
1002    # verify fails no subsequent tasks will be created against the host on the
1003    # master, and verifies are safe enough that this is OK.
1004    return _call_special_tasks_on_hosts(task, hosts)
1005
1006
1007def reverify_hosts(**filter_data):
1008    """\
1009    Schedules a set of hosts for verify.
1010
1011    @returns A list of hostnames that a verify task was created for.
1012    """
1013    return _forward_special_tasks_on_hosts(
1014            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1015
1016
1017def repair_hosts(**filter_data):
1018    """\
1019    Schedules a set of hosts for repair.
1020
1021    @returns A list of hostnames that a repair task was created for.
1022    """
1023    return _forward_special_tasks_on_hosts(
1024            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1025
1026
1027def get_jobs(not_yet_run=False, running=False, finished=False,
1028             suite=False, sub=False, standalone=False, **filter_data):
1029    """\
1030    Extra status filter args for get_jobs:
1031    -not_yet_run: Include only jobs that have not yet started running.
1032    -running: Include only jobs that have start running but for which not
1033    all hosts have completed.
1034    -finished: Include only jobs for which all hosts have completed (or
1035    aborted).
1036
1037    Extra type filter args for get_jobs:
1038    -suite: Include only jobs with child jobs.
1039    -sub: Include only jobs with a parent job.
1040    -standalone: Inlcude only jobs with no child or parent jobs.
1041    At most one of these three fields should be specified.
1042    """
1043    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1044                                                    running,
1045                                                    finished)
1046    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1047                                                                 suite,
1048                                                                 sub,
1049                                                                 standalone)
1050    job_dicts = []
1051    jobs = list(models.Job.query_objects(filter_data))
1052    models.Job.objects.populate_relationships(jobs, models.Label,
1053                                              'dependencies')
1054    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1055    for job in jobs:
1056        job_dict = job.get_object_dict()
1057        job_dict['dependencies'] = ','.join(label.name
1058                                            for label in job.dependencies)
1059        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1060                                   for keyval in job.keyvals)
1061        job_dicts.append(job_dict)
1062    return rpc_utils.prepare_for_serialization(job_dicts)
1063
1064
1065def get_num_jobs(not_yet_run=False, running=False, finished=False,
1066                 suite=False, sub=False, standalone=False,
1067                 **filter_data):
1068    """\
1069    See get_jobs() for documentation of extra filter parameters.
1070    """
1071    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1072                                                    running,
1073                                                    finished)
1074    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1075                                                                 suite,
1076                                                                 sub,
1077                                                                 standalone)
1078    return models.Job.query_count(filter_data)
1079
1080
1081def get_jobs_summary(**filter_data):
1082    """\
1083    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1084
1085    'status_counts' filed is a dictionary mapping status strings to the number
1086    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1087
1088    'result_counts' field is piped to tko's rpc_interface and has the return
1089    format specified under get_group_counts.
1090    """
1091    jobs = get_jobs(**filter_data)
1092    ids = [job['id'] for job in jobs]
1093    all_status_counts = models.Job.objects.get_status_counts(ids)
1094    for job in jobs:
1095        job['status_counts'] = all_status_counts[job['id']]
1096        job['result_counts'] = tko_rpc_interface.get_status_counts(
1097                ['afe_job_id', 'afe_job_id'],
1098                header_groups=[['afe_job_id'], ['afe_job_id']],
1099                **{'afe_job_id': job['id']})
1100    return rpc_utils.prepare_for_serialization(jobs)
1101
1102
1103def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1104    """\
1105    Retrieves all the information needed to clone a job.
1106    """
1107    job = models.Job.objects.get(id=id)
1108    job_info = rpc_utils.get_job_info(job,
1109                                      preserve_metahosts,
1110                                      queue_entry_filter_data)
1111
1112    host_dicts = []
1113    for host in job_info['hosts']:
1114        host_dict = get_hosts(id=host.id)[0]
1115        other_labels = host_dict['labels']
1116        if host_dict['platform']:
1117            other_labels.remove(host_dict['platform'])
1118        host_dict['other_labels'] = ', '.join(other_labels)
1119        host_dicts.append(host_dict)
1120
1121    for host in job_info['one_time_hosts']:
1122        host_dict = dict(hostname=host.hostname,
1123                         id=host.id,
1124                         platform='(one-time host)',
1125                         locked_text='')
1126        host_dicts.append(host_dict)
1127
1128    # convert keys from Label objects to strings (names of labels)
1129    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1130                            in job_info['meta_host_counts'].iteritems())
1131
1132    info = dict(job=job.get_object_dict(),
1133                meta_host_counts=meta_host_counts,
1134                hosts=host_dicts)
1135    info['job']['dependencies'] = job_info['dependencies']
1136    info['hostless'] = job_info['hostless']
1137    info['drone_set'] = job.drone_set and job.drone_set.name
1138
1139    image = _get_image_for_job(job, job_info['hostless'])
1140    if image:
1141        info['job']['image'] = image
1142
1143    return rpc_utils.prepare_for_serialization(info)
1144
1145
1146def _get_image_for_job(job, hostless):
1147    """Gets the image used for a job.
1148
1149    Gets the image used for an AFE job from the job's keyvals 'build' or
1150    'builds'. If that fails, and the job is a hostless job, tries to
1151    get the image from its control file attributes 'build' or 'builds'.
1152
1153    TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
1154
1155    @param job      An AFE job object.
1156    @param hostless Boolean indicating whether the job is hostless.
1157
1158    @returns The image build used for the job.
1159    """
1160    keyvals = job.keyval_dict()
1161    image = keyvals.get('build')
1162    if not image:
1163        value = keyvals.get('builds')
1164        builds = None
1165        if isinstance(value, dict):
1166            builds = value
1167        elif isinstance(value, basestring):
1168            builds = ast.literal_eval(value)
1169        if builds:
1170            image = builds.get('cros-version')
1171    if not image and hostless and job.control_file:
1172        try:
1173            control_obj = control_data.parse_control_string(
1174                    job.control_file)
1175            if hasattr(control_obj, 'build'):
1176                image = getattr(control_obj, 'build')
1177            if not image and hasattr(control_obj, 'builds'):
1178                builds = getattr(control_obj, 'builds')
1179                image = builds.get('cros-version')
1180        except:
1181            logging.warning('Failed to parse control file for job: %s',
1182                            job.name)
1183    return image
1184
1185
1186def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1187    """\
1188    @returns A sequence of nested dictionaries of host and job information.
1189    """
1190    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1191                                                   'started_on__lte',
1192                                                   start_time,
1193                                                   end_time,
1194                                                   **filter_data)
1195    return rpc_utils.prepare_rows_as_nested_dicts(
1196            models.HostQueueEntry.query_objects(filter_data),
1197            ('host', 'job'))
1198
1199
1200def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1201    """\
1202    Get the number of host queue entries associated with this job.
1203    """
1204    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1205                                                   'started_on__lte',
1206                                                   start_time,
1207                                                   end_time,
1208                                                   **filter_data)
1209    return models.HostQueueEntry.query_count(filter_data)
1210
1211
1212def get_hqe_percentage_complete(**filter_data):
1213    """
1214    Computes the fraction of host queue entries matching the given filter data
1215    that are complete.
1216    """
1217    query = models.HostQueueEntry.query_objects(filter_data)
1218    complete_count = query.filter(complete=True).count()
1219    total_count = query.count()
1220    if total_count == 0:
1221        return 1
1222    return float(complete_count) / total_count
1223
1224
1225# special tasks
1226
1227def get_special_tasks(**filter_data):
1228    """Get special task entries from the local database.
1229
1230    Query the special tasks table for tasks matching the given
1231    `filter_data`, and return a list of the results.  No attempt is
1232    made to forward the call to shards; the buck will stop here.
1233    The caller is expected to know the target shard for such reasons
1234    as:
1235      * The caller is a service (such as gs_offloader) configured
1236        to operate on behalf of one specific shard, and no other.
1237      * The caller has a host as a parameter, and knows that this is
1238        the shard assigned to that host.
1239
1240    @param filter_data  Filter keywords to pass to the underlying
1241                        database query.
1242
1243    """
1244    return rpc_utils.prepare_rows_as_nested_dicts(
1245            models.SpecialTask.query_objects(filter_data),
1246            ('host', 'queue_entry'))
1247
1248
1249def get_host_special_tasks(host_id, **filter_data):
1250    """Get special task entries for a given host.
1251
1252    Query the special tasks table for tasks that ran on the host
1253    given by `host_id` and matching the given `filter_data`.
1254    Return a list of the results.  If the host is assigned to a
1255    shard, forward this call to that shard.
1256
1257    @param host_id      Id in the database of the target host.
1258    @param filter_data  Filter keywords to pass to the underlying
1259                        database query.
1260
1261    """
1262    # Retrieve host data even if the host is in an invalid state.
1263    host = models.Host.smart_get(host_id, False)
1264    if not host.shard:
1265        return get_special_tasks(host_id=host_id, **filter_data)
1266    else:
1267        # The return values from AFE methods are post-processed
1268        # objects that aren't JSON-serializable.  So, we have to
1269        # call AFE.run() to get the raw, serializable output from
1270        # the shard.
1271        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1272        return shard_afe.run('get_special_tasks',
1273                             host_id=host_id, **filter_data)
1274
1275
1276def get_num_special_tasks(**kwargs):
1277    """Get the number of special task entries from the local database.
1278
1279    Query the special tasks table for tasks matching the given 'kwargs',
1280    and return the number of the results. No attempt is made to forward
1281    the call to shards; the buck will stop here.
1282
1283    @param kwargs    Filter keywords to pass to the underlying database query.
1284
1285    """
1286    return models.SpecialTask.query_count(kwargs)
1287
1288
1289def get_host_num_special_tasks(host, **kwargs):
1290    """Get special task entries for a given host.
1291
1292    Query the special tasks table for tasks that ran on the host
1293    given by 'host' and matching the given 'kwargs'.
1294    Return a list of the results.  If the host is assigned to a
1295    shard, forward this call to that shard.
1296
1297    @param host      id or name of a host. More often a hostname.
1298    @param kwargs    Filter keywords to pass to the underlying database query.
1299
1300    """
1301    # Retrieve host data even if the host is in an invalid state.
1302    host_model = models.Host.smart_get(host, False)
1303    if not host_model.shard:
1304        return get_num_special_tasks(host=host, **kwargs)
1305    else:
1306        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
1307        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1308
1309
1310def get_status_task(host_id, end_time):
1311    """Get the "status task" for a host from the local shard.
1312
1313    Returns a single special task representing the given host's
1314    "status task".  The status task is a completed special task that
1315    identifies whether the corresponding host was working or broken
1316    when it completed.  A successful task indicates a working host;
1317    a failed task indicates broken.
1318
1319    This call will not be forward to a shard; the receiving server
1320    must be the shard that owns the host.
1321
1322    @param host_id      Id in the database of the target host.
1323    @param end_time     Time reference for the host's status.
1324
1325    @return A single task; its status (successful or not)
1326            corresponds to the status of the host (working or
1327            broken) at the given time.  If no task is found, return
1328            `None`.
1329
1330    """
1331    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1332            status_history.get_status_task(host_id, end_time),
1333            ('host', 'queue_entry'))
1334    return tasklist[0] if tasklist else None
1335
1336
1337def get_host_status_task(host_id, end_time):
1338    """Get the "status task" for a host from its owning shard.
1339
1340    Finds the given host's owning shard, and forwards to it a call
1341    to `get_status_task()` (see above).
1342
1343    @param host_id      Id in the database of the target host.
1344    @param end_time     Time reference for the host's status.
1345
1346    @return A single task; its status (successful or not)
1347            corresponds to the status of the host (working or
1348            broken) at the given time.  If no task is found, return
1349            `None`.
1350
1351    """
1352    host = models.Host.smart_get(host_id)
1353    if not host.shard:
1354        return get_status_task(host_id, end_time)
1355    else:
1356        # The return values from AFE methods are post-processed
1357        # objects that aren't JSON-serializable.  So, we have to
1358        # call AFE.run() to get the raw, serializable output from
1359        # the shard.
1360        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1361        return shard_afe.run('get_status_task',
1362                             host_id=host_id, end_time=end_time)
1363
1364
1365def get_host_diagnosis_interval(host_id, end_time, success):
1366    """Find a "diagnosis interval" for a given host.
1367
1368    A "diagnosis interval" identifies a start and end time where
1369    the host went from "working" to "broken", or vice versa.  The
1370    interval's starting time is the starting time of the last status
1371    task with the old status; the end time is the finish time of the
1372    first status task with the new status.
1373
1374    This routine finds the most recent diagnosis interval for the
1375    given host prior to `end_time`, with a starting status matching
1376    `success`.  If `success` is true, the interval will start with a
1377    successful status task; if false the interval will start with a
1378    failed status task.
1379
1380    @param host_id      Id in the database of the target host.
1381    @param end_time     Time reference for the diagnosis interval.
1382    @param success      Whether the diagnosis interval should start
1383                        with a successful or failed status task.
1384
1385    @return A list of two strings.  The first is the timestamp for
1386            the beginning of the interval; the second is the
1387            timestamp for the end.  If the host has never changed
1388            state, the list is empty.
1389
1390    """
1391    host = models.Host.smart_get(host_id)
1392    if not host.shard or utils.is_shard():
1393        return status_history.get_diagnosis_interval(
1394                host_id, end_time, success)
1395    else:
1396        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1397        return shard_afe.get_host_diagnosis_interval(
1398                host_id, end_time, success)
1399
1400
1401# support for host detail view
1402
1403def get_host_queue_entries_and_special_tasks(host, query_start=None,
1404                                             query_limit=None, start_time=None,
1405                                             end_time=None):
1406    """
1407    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1408            in approximate run order.  each dict contains keys for type, host,
1409            job, status, started_on, execution_path, and ID.
1410    """
1411    total_limit = None
1412    if query_limit is not None:
1413        total_limit = query_start + query_limit
1414    filter_data_common = {'host': host,
1415                          'query_limit': total_limit,
1416                          'sort_by': ['-id']}
1417
1418    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1419            'time_started__gte', 'time_started__lte', start_time, end_time,
1420            **filter_data_common)
1421
1422    queue_entries = get_host_queue_entries(
1423            start_time, end_time, **filter_data_common)
1424    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1425
1426    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1427                                                       special_tasks)
1428    if query_start is not None:
1429        interleaved_entries = interleaved_entries[query_start:]
1430    if query_limit is not None:
1431        interleaved_entries = interleaved_entries[:query_limit]
1432    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1433            interleaved_entries, queue_entries)
1434
1435
1436def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1437                                                 end_time=None):
1438    filter_data_common = {'host': host}
1439
1440    filter_data_queue_entries, filter_data_special_tasks = (
1441            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1442                    filter_data_common, start_time, end_time))
1443
1444    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1445            + get_host_num_special_tasks(**filter_data_special_tasks))
1446
1447
1448# other
1449
1450def echo(data=""):
1451    """\
1452    Returns a passed in string. For doing a basic test to see if RPC calls
1453    can successfully be made.
1454    """
1455    return data
1456
1457
1458def get_motd():
1459    """\
1460    Returns the message of the day as a string.
1461    """
1462    return rpc_utils.get_motd()
1463
1464
1465def get_static_data():
1466    """\
1467    Returns a dictionary containing a bunch of data that shouldn't change
1468    often and is otherwise inaccessible.  This includes:
1469
1470    priorities: List of job priority choices.
1471    default_priority: Default priority value for new jobs.
1472    users: Sorted list of all users.
1473    labels: Sorted list of labels not start with 'cros-version' and
1474            'fw-version'.
1475    tests: Sorted list of all tests.
1476    profilers: Sorted list of all profilers.
1477    current_user: Logged-in username.
1478    host_statuses: Sorted list of possible Host statuses.
1479    job_statuses: Sorted list of possible HostQueueEntry statuses.
1480    job_timeout_default: The default job timeout length in minutes.
1481    parse_failed_repair_default: Default value for the parse_failed_repair job
1482            option.
1483    reboot_before_options: A list of valid RebootBefore string enums.
1484    reboot_after_options: A list of valid RebootAfter string enums.
1485    motd: Server's message of the day.
1486    status_dictionary: A mapping from one word job status names to a more
1487            informative description.
1488    """
1489
1490    default_drone_set_name = models.DroneSet.default_drone_set_name()
1491    drone_sets = ([default_drone_set_name] +
1492                  sorted(drone_set.name for drone_set in
1493                         models.DroneSet.objects.exclude(
1494                                 name=default_drone_set_name)))
1495
1496    result = {}
1497    result['priorities'] = priorities.Priority.choices()
1498    result['default_priority'] = 'Default'
1499    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1500    result['users'] = get_users(sort_by=['login'])
1501
1502    label_exclude_filters = [{'name__startswith': 'cros-version'},
1503                             {'name__startswith': 'fw-version'},
1504                             {'name__startswith': 'fwrw-version'},
1505                             {'name__startswith': 'fwro-version'},
1506                             {'name__startswith': 'ab-version'},
1507                             {'name__startswith': 'testbed-version'}]
1508    result['labels'] = get_labels(
1509        label_exclude_filters,
1510        sort_by=['-platform', 'name'])
1511
1512    result['tests'] = get_tests(sort_by=['name'])
1513    result['profilers'] = get_profilers(sort_by=['name'])
1514    result['current_user'] = rpc_utils.prepare_for_serialization(
1515        models.User.current_user().get_object_dict())
1516    result['host_statuses'] = sorted(models.Host.Status.names)
1517    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1518    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1519    result['job_max_runtime_mins_default'] = (
1520        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1521    result['parse_failed_repair_default'] = bool(
1522        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1523    result['reboot_before_options'] = model_attributes.RebootBefore.names
1524    result['reboot_after_options'] = model_attributes.RebootAfter.names
1525    result['motd'] = rpc_utils.get_motd()
1526    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1527    result['drone_sets'] = drone_sets
1528
1529    result['status_dictionary'] = {"Aborted": "Aborted",
1530                                   "Verifying": "Verifying Host",
1531                                   "Provisioning": "Provisioning Host",
1532                                   "Pending": "Waiting on other hosts",
1533                                   "Running": "Running autoserv",
1534                                   "Completed": "Autoserv completed",
1535                                   "Failed": "Failed to complete",
1536                                   "Queued": "Queued",
1537                                   "Starting": "Next in host's queue",
1538                                   "Stopped": "Other host(s) failed verify",
1539                                   "Parsing": "Awaiting parse of final results",
1540                                   "Gathering": "Gathering log files",
1541                                   "Waiting": "Waiting for scheduler action",
1542                                   "Archiving": "Archiving results",
1543                                   "Resetting": "Resetting hosts"}
1544
1545    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1546    result['is_moblab'] = bool(utils.is_moblab())
1547
1548    return result
1549
1550
1551def get_server_time():
1552    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1553
1554
1555def get_hosts_by_attribute(attribute, value):
1556    """
1557    Get the list of valid hosts that share the same host attribute value.
1558
1559    @param attribute: String of the host attribute to check.
1560    @param value: String of the value that is shared between hosts.
1561
1562    @returns List of hostnames that all have the same host attribute and
1563             value.
1564    """
1565    hosts = models.HostAttribute.query_objects({'attribute': attribute,
1566                                                'value': value})
1567    return [row.host.hostname for row in hosts if row.host.invalid == 0]
1568
1569
1570def canonicalize_suite_name(suite_name):
1571    """Canonicalize the suite's name.
1572
1573    @param suite_name: the name of the suite.
1574    """
1575    # Do not change this naming convention without updating
1576    # site_utils.parse_job_name.
1577    return 'test_suites/control.%s' % suite_name
1578
1579
1580def formatted_now():
1581    """Format the current datetime."""
1582    return datetime.datetime.now().strftime(time_utils.TIME_FMT)
1583
1584
1585def _get_control_file_by_build(build, ds, suite_name):
1586    """Return control file contents for |suite_name|.
1587
1588    Query the dev server at |ds| for the control file |suite_name|, included
1589    in |build| for |board|.
1590
1591    @param build: unique name by which to refer to the image from now on.
1592    @param ds: a dev_server.DevServer instance to fetch control file with.
1593    @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
1594    @raises ControlFileNotFound if a unique suite control file doesn't exist.
1595    @raises NoControlFileList if we can't list the control files at all.
1596    @raises ControlFileEmpty if the control file exists on the server, but
1597                             can't be read.
1598
1599    @return the contents of the desired control file.
1600    """
1601    getter = control_file_getter.DevServerGetter.create(build, ds)
1602    devserver_name = ds.hostname
1603    timer = autotest_stats.Timer('control_files.parse.%s.%s' %
1604                                 (devserver_name.replace('.', '_'),
1605                                  suite_name.rsplit('.')[-1]))
1606    # Get the control file for the suite.
1607    try:
1608        with timer:
1609            control_file_in = getter.get_control_file_contents_by_name(
1610                    suite_name)
1611    except error.CrosDynamicSuiteException as e:
1612        raise type(e)('Failed to get control file for %s '
1613                      '(devserver: %s) (error: %s)' %
1614                      (build, devserver_name, e))
1615    if not control_file_in:
1616        raise error.ControlFileEmpty(
1617            "Fetching %s returned no data. (devserver: %s)" %
1618            (suite_name, devserver_name))
1619    # Force control files to only contain ascii characters.
1620    try:
1621        control_file_in.encode('ascii')
1622    except UnicodeDecodeError as e:
1623        raise error.ControlFileMalformed(str(e))
1624
1625    return control_file_in
1626
1627
1628def _get_control_file_by_suite(suite_name):
1629    """Get control file contents by suite name.
1630
1631    @param suite_name: Suite name as string.
1632    @returns: Control file contents as string.
1633    """
1634    getter = control_file_getter.FileSystemGetter(
1635            [_CONFIG.get_config_value('SCHEDULER',
1636                                      'drone_installation_directory')])
1637    return getter.get_control_file_contents_by_name(suite_name)
1638
1639
1640def _stage_build_artifacts(build, hostname=None):
1641    """
1642    Ensure components of |build| necessary for installing images are staged.
1643
1644    @param build image we want to stage.
1645    @param hostname hostname of a dut may run test on. This is to help to locate
1646        a devserver closer to duts if needed. Default is None.
1647
1648    @raises StageControlFileFailure: if the dev server throws 500 while staging
1649        suite control files.
1650
1651    @return: dev_server.ImageServer instance to use with this build.
1652    @return: timings dictionary containing staging start/end times.
1653    """
1654    timings = {}
1655    # Ensure components of |build| necessary for installing images are staged
1656    # on the dev server. However set synchronous to False to allow other
1657    # components to be downloaded in the background.
1658    ds = dev_server.resolve(build, hostname=hostname)
1659    ds_name = ds.hostname
1660    timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
1661    timer = autotest_stats.Timer('control_files.stage.%s' % (
1662            ds_name.replace('.', '_')))
1663    try:
1664        with timer:
1665            ds.stage_artifacts(image=build, artifacts=['test_suites'])
1666    except dev_server.DevServerException as e:
1667        raise error.StageControlFileFailure(
1668                "Failed to stage %s on %s: %s" % (build, ds_name, e))
1669    timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
1670    return (ds, timings)
1671
1672
1673@rpc_utils.route_rpc_to_master
1674def create_suite_job(
1675        name='',
1676        board='',
1677        pool='',
1678        control_file='',
1679        check_hosts=True,
1680        num=None,
1681        file_bugs=False,
1682        timeout=24,
1683        timeout_mins=None,
1684        priority=priorities.Priority.DEFAULT,
1685        suite_args=None,
1686        wait_for_results=True,
1687        job_retry=False,
1688        max_retries=None,
1689        max_runtime_mins=None,
1690        suite_min_duts=0,
1691        offload_failures_only=False,
1692        builds=None,
1693        test_source_build=None,
1694        run_prod_code=False,
1695        delay_minutes=0,
1696        is_cloning=False,
1697        job_keyvals=None,
1698        test_args=None,
1699        **kwargs
1700):
1701    """
1702    Create a job to run a test suite on the given device with the given image.
1703
1704    When the timeout specified in the control file is reached, the
1705    job is guaranteed to have completed and results will be available.
1706
1707    @param name: The test name if control_file is supplied, otherwise the name
1708                 of the test suite to run, e.g. 'bvt'.
1709    @param board: the kind of device to run the tests on.
1710    @param builds: the builds to install e.g.
1711                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
1712                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
1713                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
1714                   If builds is given a value, it overrides argument build.
1715    @param test_source_build: Build that contains the server-side test code.
1716    @param pool: Specify the pool of machines to use for scheduling
1717            purposes.
1718    @param control_file: the control file of the job.
1719    @param check_hosts: require appropriate live hosts to exist in the lab.
1720    @param num: Specify the number of machines to schedule across (integer).
1721                Leave unspecified or use None to use default sharding factor.
1722    @param file_bugs: File a bug on each test failure in this suite.
1723    @param timeout: The max lifetime of this suite, in hours.
1724    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
1725                         priority over timeout.
1726    @param priority: Integer denoting priority. Higher is more important.
1727    @param suite_args: Optional arguments which will be parsed by the suite
1728                       control file. Used by control.test_that_wrapper to
1729                       determine which tests to run.
1730    @param wait_for_results: Set to False to run the suite job without waiting
1731                             for test jobs to finish. Default is True.
1732    @param job_retry: Set to True to enable job-level retry. Default is False.
1733    @param max_retries: Integer, maximum job retries allowed at suite level.
1734                        None for no max.
1735    @param max_runtime_mins: Maximum amount of time a job can be running in
1736                             minutes.
1737    @param suite_min_duts: Integer. Scheduler will prioritize getting the
1738                           minimum number of machines for the suite when it is
1739                           competing with another suite that has a higher
1740                           priority but already got minimum machines it needs.
1741    @param offload_failures_only: Only enable gs_offloading for failed jobs.
1742    @param run_prod_code: If True, the suite will run the test code that
1743                          lives in prod aka the test code currently on the
1744                          lab servers. If False, the control files and test
1745                          code for this suite run will be retrieved from the
1746                          build artifacts.
1747    @param delay_minutes: Delay the creation of test jobs for a given number of
1748                          minutes.
1749    @param is_cloning: True if creating a cloning job.
1750    @param job_keyvals: A dict of job keyvals to be inject to control file.
1751    @param test_args: A dict of args passed all the way to each individual test
1752                      that will be actually run.
1753    @param kwargs: extra keyword args. NOT USED.
1754
1755    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
1756    @raises NoControlFileList: if we can't list the control files at all.
1757    @raises StageControlFileFailure: If the dev server throws 500 while
1758                                     staging test_suites.
1759    @raises ControlFileEmpty: if the control file exists on the server, but
1760                              can't be read.
1761
1762    @return: the job ID of the suite; -1 on error.
1763    """
1764    if type(num) is not int and num is not None:
1765        raise error.SuiteArgumentException('Ill specified num argument %r. '
1766                                           'Must be an integer or None.' % num)
1767    if num == 0:
1768        logging.warning("Can't run on 0 hosts; using default.")
1769        num = None
1770
1771    if builds is None:
1772        builds = {}
1773
1774    # Default test source build to CrOS build if it's not specified and
1775    # run_prod_code is set to False.
1776    if not run_prod_code:
1777        test_source_build = Suite.get_test_source_build(
1778                builds, test_source_build=test_source_build)
1779
1780    sample_dut = rpc_utils.get_sample_dut(board, pool)
1781
1782    suite_name = canonicalize_suite_name(name)
1783    if run_prod_code:
1784        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
1785        keyvals = {}
1786    else:
1787        (ds, keyvals) = _stage_build_artifacts(
1788                test_source_build, hostname=sample_dut)
1789    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
1790
1791    # Do not change this naming convention without updating
1792    # site_utils.parse_job_name.
1793    if run_prod_code:
1794        # If run_prod_code is True, test_source_build is not set, use the
1795        # first build in the builds list for the sutie job name.
1796        name = '%s-%s' % (builds.values()[0], suite_name)
1797    else:
1798        name = '%s-%s' % (test_source_build, suite_name)
1799
1800    timeout_mins = timeout_mins or timeout * 60
1801    max_runtime_mins = max_runtime_mins or timeout * 60
1802
1803    if not board:
1804        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
1805
1806    if run_prod_code:
1807        control_file = _get_control_file_by_suite(suite_name)
1808
1809    if not control_file:
1810        # No control file was supplied so look it up from the build artifacts.
1811        control_file = _get_control_file_by_build(
1812                test_source_build, ds, suite_name)
1813
1814    # Prepend builds and board to the control file.
1815    if is_cloning:
1816        control_file = tools.remove_injection(control_file)
1817
1818    inject_dict = {
1819        'board': board,
1820        # `build` is needed for suites like AU to stage image inside suite
1821        # control file.
1822        'build': test_source_build,
1823        'builds': builds,
1824        'check_hosts': check_hosts,
1825        'pool': pool,
1826        'num': num,
1827        'file_bugs': file_bugs,
1828        'timeout': timeout,
1829        'timeout_mins': timeout_mins,
1830        'devserver_url': ds.url(),
1831        'priority': priority,
1832        'suite_args' : suite_args,
1833        'wait_for_results': wait_for_results,
1834        'job_retry': job_retry,
1835        'max_retries': max_retries,
1836        'max_runtime_mins': max_runtime_mins,
1837        'offload_failures_only': offload_failures_only,
1838        'test_source_build': test_source_build,
1839        'run_prod_code': run_prod_code,
1840        'delay_minutes': delay_minutes,
1841        'job_keyvals': job_keyvals,
1842        'test_args': test_args,
1843    }
1844    control_file = tools.inject_vars(inject_dict, control_file)
1845
1846    return rpc_utils.create_job_common(name,
1847                                       priority=priority,
1848                                       timeout_mins=timeout_mins,
1849                                       max_runtime_mins=max_runtime_mins,
1850                                       control_type='Server',
1851                                       control_file=control_file,
1852                                       hostless=True,
1853                                       keyvals=keyvals)
1854
1855
1856def get_job_history(**filter_data):
1857    """Get history of the job, including the special tasks executed for the job
1858
1859    @param filter_data: filter for the call, should at least include
1860                        {'job_id': [job id]}
1861    @returns: JSON string of the job's history, including the information such
1862              as the hosts run the job and the special tasks executed before
1863              and after the job.
1864    """
1865    job_id = filter_data['job_id']
1866    job_info = job_history.get_job_info(job_id)
1867    return rpc_utils.prepare_for_serialization(job_info.get_history())
1868
1869
1870def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
1871    """Get history of a list of host.
1872
1873    The return is a JSON string of host history for each host, for example,
1874    {'172.22.33.51': [{'status': 'Resetting'
1875                       'start_time': '2014-08-07 10:02:16',
1876                       'end_time': '2014-08-07 10:03:16',
1877                       'log_url': 'http://autotest/reset-546546/debug',
1878                       'dbg_str': 'Task: Special Task 19441991 (host ...)'},
1879                       {'status': 'Running'
1880                       'start_time': '2014-08-07 10:03:18',
1881                       'end_time': '2014-08-07 10:13:00',
1882                       'log_url': 'http://autotest/reset-546546/debug',
1883                       'dbg_str': 'HQE: 15305005, for job: 14995562'}
1884                     ]
1885    }
1886    @param start_time: start time to search for history, can be string value or
1887                       epoch time.
1888    @param end_time: end time to search for history, can be string value or
1889                     epoch time.
1890    @param hosts: A list of hosts to search for history. Default is None.
1891    @param board: board type of hosts. Default is None.
1892    @param pool: pool type of hosts. Default is None.
1893    @returns: JSON string of the host history.
1894    """
1895    return rpc_utils.prepare_for_serialization(
1896            host_history.get_history_details(
1897                    start_time=start_time, end_time=end_time,
1898                    hosts=hosts, board=board, pool=pool,
1899                    process_pool_size=4))
1900
1901
1902def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
1903                    known_host_ids=(), known_host_statuses=()):
1904    """Receive updates for job statuses from shards and assign hosts and jobs.
1905
1906    @param shard_hostname: Hostname of the calling shard
1907    @param jobs: Jobs in serialized form that should be updated with newer
1908                 status from a shard.
1909    @param hqes: Hostqueueentries in serialized form that should be updated with
1910                 newer status from a shard. Note that for every hostqueueentry
1911                 the corresponding job must be in jobs.
1912    @param known_job_ids: List of ids of jobs the shard already has.
1913    @param known_host_ids: List of ids of hosts the shard already has.
1914    @param known_host_statuses: List of statuses of hosts the shard already has.
1915
1916    @returns: Serialized representations of hosts, jobs, suite job keyvals
1917              and their dependencies to be inserted into a shard's database.
1918    """
1919    # The following alternatives to sending host and job ids in every heartbeat
1920    # have been considered:
1921    # 1. Sending the highest known job and host ids. This would work for jobs:
1922    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
1923    #    particular shard during a heartbeat, it never will be assigned to this
1924    #    shard later.
1925    #    This is not true for hosts though: A host that is leased won't be sent
1926    #    to the shard now, but might be sent in a future heartbeat. This means
1927    #    sometimes hosts should be transfered that have a lower id than the
1928    #    maximum host id the shard knows.
1929    # 2. Send the number of jobs/hosts the shard knows to the master in each
1930    #    heartbeat. Compare these to the number of records that already have
1931    #    the shard_id set to this shard. In the normal case, they should match.
1932    #    In case they don't, resend all entities of that type.
1933    #    This would work well for hosts, because there aren't that many.
1934    #    Resending all jobs is quite a big overhead though.
1935    #    Also, this approach might run into edge cases when entities are
1936    #    ever deleted.
1937    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
1938    #    Using two different approaches isn't consistent and might cause
1939    #    confusion. Also the issues with the case of deletions might still
1940    #    occur.
1941    #
1942    # The overhead of sending all job and host ids in every heartbeat is low:
1943    # At peaks one board has about 1200 created but unfinished jobs.
1944    # See the numbers here: http://goo.gl/gQCGWH
1945    # Assuming that job id's have 6 digits and that json serialization takes a
1946    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
1947    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
1948    # A NOT IN query with 5000 ids took about 30ms in tests made.
1949    # These numbers seem low enough to outweigh the disadvantages of the
1950    # solutions described above.
1951    timer = autotest_stats.Timer('shard_heartbeat')
1952    with timer:
1953        shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
1954        rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
1955        assert len(known_host_ids) == len(known_host_statuses)
1956        for i in range(len(known_host_ids)):
1957            host_model = models.Host.objects.get(pk=known_host_ids[i])
1958            if host_model.status != known_host_statuses[i]:
1959                host_model.status = known_host_statuses[i]
1960                host_model.save()
1961
1962        hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
1963                shard_obj, known_job_ids=known_job_ids,
1964                known_host_ids=known_host_ids)
1965        return {
1966            'hosts': [host.serialize() for host in hosts],
1967            'jobs': [job.serialize() for job in jobs],
1968            'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
1969        }
1970
1971
1972def get_shards(**filter_data):
1973    """Return a list of all shards.
1974
1975    @returns A sequence of nested dictionaries of shard information.
1976    """
1977    shards = models.Shard.query_objects(filter_data)
1978    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
1979    for serialized, shard in zip(serialized_shards, shards):
1980        serialized['labels'] = [label.name for label in shard.labels.all()]
1981
1982    return serialized_shards
1983
1984
1985def _assign_board_to_shard_precheck(labels):
1986    """Verify whether board labels are valid to be added to a given shard.
1987
1988    First check whether board label is in correct format. Second, check whether
1989    the board label exist. Third, check whether the board has already been
1990    assigned to shard.
1991
1992    @param labels: Board labels separated by comma.
1993
1994    @raises error.RPCException: If label provided doesn't start with `board:`
1995            or board has been added to shard already.
1996    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
1997
1998    @returns: A list of label models that ready to be added to shard.
1999    """
2000    if not labels:
2001      # allow creation of label-less shards (labels='' would otherwise fail the
2002      # checks below)
2003      return []
2004    labels = labels.split(',')
2005    label_models = []
2006    for label in labels:
2007        # Check whether the board label is in correct format.
2008        if not label.startswith('board:'):
2009            raise error.RPCException('Sharding only supports `board:.*` label.')
2010        # Check whether the board label exist. If not, exception will be thrown
2011        # by smart_get function.
2012        label = models.Label.smart_get(label)
2013        # Check whether the board has been sharded already
2014        try:
2015            shard = models.Shard.objects.get(labels=label)
2016            raise error.RPCException(
2017                    '%s is already on shard %s' % (label, shard.hostname))
2018        except models.Shard.DoesNotExist:
2019            # board is not on any shard, so it's valid.
2020            label_models.append(label)
2021    return label_models
2022
2023
2024def add_shard(hostname, labels):
2025    """Add a shard and start running jobs on it.
2026
2027    @param hostname: The hostname of the shard to be added; needs to be unique.
2028    @param labels: Board labels separated by comma. Jobs of one of the labels
2029                   will be assigned to the shard.
2030
2031    @raises error.RPCException: If label provided doesn't start with `board:` or
2032            board has been added to shard already.
2033    @raises model_logic.ValidationError: If a shard with the given hostname
2034            already exist.
2035    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2036
2037    @returns: The id of the added shard.
2038    """
2039    labels = _assign_board_to_shard_precheck(labels)
2040    shard = models.Shard.add_object(hostname=hostname)
2041    for label in labels:
2042        shard.labels.add(label)
2043    return shard.id
2044
2045
2046def add_board_to_shard(hostname, labels):
2047    """Add boards to a given shard
2048
2049    @param hostname: The hostname of the shard to be changed.
2050    @param labels: Board labels separated by comma.
2051
2052    @raises error.RPCException: If label provided doesn't start with `board:` or
2053            board has been added to shard already.
2054    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
2055
2056    @returns: The id of the changed shard.
2057    """
2058    labels = _assign_board_to_shard_precheck(labels)
2059    shard = models.Shard.objects.get(hostname=hostname)
2060    for label in labels:
2061        shard.labels.add(label)
2062    return shard.id
2063
2064
2065def delete_shard(hostname):
2066    """Delete a shard and reclaim all resources from it.
2067
2068    This claims back all assigned hosts from the shard. To ensure all DUTs are
2069    in a sane state, a Reboot task with highest priority is scheduled for them.
2070    This reboots the DUTs and then all left tasks continue to run in drone of
2071    the master.
2072
2073    The procedure for deleting a shard:
2074        * Lock all unlocked hosts on that shard.
2075        * Remove shard information .
2076        * Assign a reboot task with highest priority to these hosts.
2077        * Unlock these hosts, then, the reboot tasks run in front of all other
2078        tasks.
2079
2080    The status of jobs that haven't been reported to be finished yet, will be
2081    lost. The master scheduler will pick up the jobs and execute them.
2082
2083    @param hostname: Hostname of the shard to delete.
2084    """
2085    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
2086    hostnames_to_lock = [h.hostname for h in
2087                         models.Host.objects.filter(shard=shard, locked=False)]
2088
2089    # TODO(beeps): Power off shard
2090    # For ChromeOS hosts, a reboot test with the highest priority is added to
2091    # the DUT. After a reboot it should be ganranteed that no processes from
2092    # prior tests that were run by a shard are still running on.
2093
2094    # Lock all unlocked hosts.
2095    dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
2096    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
2097
2098    # Remove shard information.
2099    models.Host.objects.filter(shard=shard).update(shard=None)
2100    models.Job.objects.filter(shard=shard).update(shard=None)
2101    shard.labels.clear()
2102    shard.delete()
2103
2104    # Assign a reboot task with highest priority: Super.
2105    t = models.Test.objects.get(name='platform_BootPerfServer:shard')
2106    c = utils.read_file(os.path.join(common.autotest_dir, t.path))
2107    if hostnames_to_lock:
2108        rpc_utils.create_job_common(
2109                'reboot_dut_for_shard_deletion',
2110                priority=priorities.Priority.SUPER,
2111                control_type='Server',
2112                control_file=c, hosts=hostnames_to_lock)
2113
2114    # Unlock these shard-related hosts.
2115    dicts = {'locked': False, 'lock_time': None}
2116    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
2117
2118
2119def get_servers(hostname=None, role=None, status=None):
2120    """Get a list of servers with matching role and status.
2121
2122    @param hostname: FQDN of the server.
2123    @param role: Name of the server role, e.g., drone, scheduler. Default to
2124                 None to match any role.
2125    @param status: Status of the server, e.g., primary, backup, repair_required.
2126                   Default to None to match any server status.
2127
2128    @raises error.RPCException: If server database is not used.
2129    @return: A list of server names for servers with matching role and status.
2130    """
2131    if not server_manager_utils.use_server_db():
2132        raise error.RPCException('Server database is not enabled. Please try '
2133                                 'retrieve servers from global config.')
2134    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
2135                                               status=status)
2136    return [s.get_details() for s in servers]
2137
2138
2139@rpc_utils.route_rpc_to_master
2140def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
2141    """Get stable version for the given board.
2142
2143    @param board: Name of the board.
2144    @param android: If True, the given board is an Android-based device. If
2145                    False, assume its a Chrome OS-based device.
2146
2147    @return: Stable version of the given board. Return global configure value
2148             of CROS.stable_cros_version if stable_versinos table does not have
2149             entry of board DEFAULT.
2150    """
2151    return stable_version_utils.get(board=board, android=android)
2152
2153
2154@rpc_utils.route_rpc_to_master
2155def get_all_stable_versions():
2156    """Get stable versions for all boards.
2157
2158    @return: A dictionary of board:version.
2159    """
2160    return stable_version_utils.get_all()
2161
2162
2163@rpc_utils.route_rpc_to_master
2164def set_stable_version(version, board=stable_version_utils.DEFAULT):
2165    """Modify stable version for the given board.
2166
2167    @param version: The new value of stable version for given board.
2168    @param board: Name of the board, default to value `DEFAULT`.
2169    """
2170    stable_version_utils.set(version=version, board=board)
2171
2172
2173@rpc_utils.route_rpc_to_master
2174def delete_stable_version(board):
2175    """Modify stable version for the given board.
2176
2177    Delete a stable version entry in afe_stable_versions table for a given
2178    board, so default stable version will be used.
2179
2180    @param board: Name of the board.
2181    """
2182    stable_version_utils.delete(board=board)
2183
2184
2185def get_tests_by_build(build, ignore_invalid_tests=True):
2186    """Get the tests that are available for the specified build.
2187
2188    @param build: unique name by which to refer to the image.
2189    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
2190
2191    @return: A sorted list of all tests that are in the build specified.
2192    """
2193    # Collect the control files specified in this build
2194    cfile_getter = control_file_lib._initialize_control_file_getter(build)
2195    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
2196        control_file_info_list = cfile_getter.get_suite_info()
2197        control_file_list = control_file_info_list.keys()
2198    else:
2199        control_file_list = cfile_getter.get_control_file_list()
2200
2201    test_objects = []
2202    _id = 0
2203    for control_file_path in control_file_list:
2204        # Read and parse the control file
2205        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
2206            control_file = control_file_info_list[control_file_path]
2207        else:
2208            control_file = cfile_getter.get_control_file_contents(
2209                    control_file_path)
2210        try:
2211            control_obj = control_data.parse_control_string(control_file)
2212        except:
2213            logging.info('Failed to parse control file: %s', control_file_path)
2214            if not ignore_invalid_tests:
2215                raise
2216
2217        # Extract the values needed for the AFE from the control_obj.
2218        # The keys list represents attributes in the control_obj that
2219        # are required by the AFE
2220        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
2221                'test_category', 'test_class', 'dependencies', 'run_verify',
2222                'sync_count', 'job_retries', 'retries', 'path']
2223
2224        test_object = {}
2225        for key in keys:
2226            test_object[key] = getattr(control_obj, key) if hasattr(
2227                    control_obj, key) else ''
2228
2229        # Unfortunately, the AFE expects different key-names for certain
2230        # values, these must be corrected to avoid the risk of tests
2231        # being omitted by the AFE.
2232        # The 'id' is an additional value used in the AFE.
2233        # The control_data parsing does not reference 'run_reset', but it
2234        # is also used in the AFE and defaults to True.
2235        test_object['id'] = _id
2236        test_object['run_reset'] = True
2237        test_object['description'] = test_object.get('doc', '')
2238        test_object['test_time'] = test_object.get('time', 0)
2239        test_object['test_retry'] = test_object.get('retries', 0)
2240
2241        # Fix the test name to be consistent with the current presentation
2242        # of test names in the AFE.
2243        testpath, subname = os.path.split(control_file_path)
2244        testname = os.path.basename(testpath)
2245        subname = subname.split('.')[1:]
2246        if subname:
2247            testname = '%s:%s' % (testname, ':'.join(subname))
2248
2249        test_object['name'] = testname
2250
2251        # Correct the test path as parse_control_string sets an empty string.
2252        test_object['path'] = control_file_path
2253
2254        _id += 1
2255        test_objects.append(test_object)
2256
2257    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
2258    return rpc_utils.prepare_for_serialization(test_objects)
2259