rpc_interface.py revision 54a5b675ec32d9fe5ba5593fc9ef8ceb055c0a61
1# pylint: disable-msg=C0111
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import sys
35import datetime
36import logging
37
38from django.db.models import Count
39import common
40from autotest_lib.client.common_lib import priorities
41from autotest_lib.client.common_lib.cros import dev_server
42from autotest_lib.client.common_lib.cros.graphite import autotest_stats
43from autotest_lib.frontend.afe import control_file, rpc_utils
44from autotest_lib.frontend.afe import models, model_logic, model_attributes
45from autotest_lib.frontend.afe import site_rpc_interface
46from autotest_lib.frontend.tko import models as tko_models
47from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
48from autotest_lib.server import frontend
49from autotest_lib.server import utils
50from autotest_lib.server.cros import provision
51from autotest_lib.server.cros.dynamic_suite import tools
52from autotest_lib.site_utils import status_history
53
54
55_timer = autotest_stats.Timer('rpc_interface')
56
57def get_parameterized_autoupdate_image_url(job):
58    """Get the parameterized autoupdate image url from a parameterized job."""
59    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
60    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
61                                                           name='image')
62    para_set = job.parameterized_job.parameterizedjobparameter_set
63    job_test_para = para_set.get(test_parameter=image_parameter)
64    return job_test_para.parameter_value
65
66
67# labels
68
69def modify_label(id, **data):
70    """Modify a label.
71
72    @param id: id or name of a label. More often a label name.
73    @param data: New data for a label.
74    """
75    label_model = models.Label.smart_get(id)
76    label_model.update_object(data)
77
78    # Master forwards the RPC to shards
79    if not utils.is_shard():
80        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
81                             id=id, **data)
82
83
84def delete_label(id):
85    """Delete a label.
86
87    @param id: id or name of a label. More often a label name.
88    """
89    label_model = models.Label.smart_get(id)
90    # Hosts that have the label to be deleted. Save this info before
91    # the label is deleted to use it later.
92    hosts = []
93    for h in label_model.host_set.all():
94        hosts.append(models.Host.smart_get(h.id))
95    label_model.delete()
96
97    # Master forwards the RPC to shards
98    if not utils.is_shard():
99        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
100
101
102def add_label(name, ignore_exception_if_exists=False, **kwargs):
103    """Adds a new label of a given name.
104
105    @param name: label name.
106    @param ignore_exception_if_exists: If True and the exception was
107        thrown due to the duplicated label name when adding a label,
108        then suppress the exception. Default is False.
109    @param kwargs: keyword args that store more info about a label
110        other than the name.
111    @return: int/long id of a new label.
112    """
113    # models.Label.add_object() throws model_logic.ValidationError
114    # when it is given a label name that already exists.
115    # However, ValidationError can be thrown with different errors,
116    # and those errors should be thrown up to the call chain.
117    try:
118        label = models.Label.add_object(name=name, **kwargs)
119    except:
120        exc_info = sys.exc_info()
121        if ignore_exception_if_exists:
122            label = rpc_utils.get_label(name)
123            # If the exception is raised not because of duplicated
124            # "name", then raise the original exception.
125            if label is None:
126                raise exc_info[0], exc_info[1], exc_info[2]
127        else:
128            raise exc_info[0], exc_info[1], exc_info[2]
129    return label.id
130
131
132def add_label_to_hosts(id, hosts):
133    """Adds a label of the given id to the given hosts only in local DB.
134
135    @param id: id or name of a label. More often a label name.
136    @param hosts: The hostnames of hosts that need the label.
137
138    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
139    """
140    label = models.Label.smart_get(id)
141    host_objs = models.Host.smart_get_bulk(hosts)
142    if label.platform:
143        models.Host.check_no_platform(host_objs)
144    label.host_set.add(*host_objs)
145
146
147def _create_label_everywhere(id, hosts):
148    """
149    Yet another method to create labels.
150
151    ALERT! This method should be run only on master not shards!
152    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
153
154    This method exists primarily to serve label_add_hosts() and
155    host_add_labels().  Basically it pulls out the label check/add logic
156    from label_add_hosts() into this nice method that not only creates
157    the label but also tells the shards that service the hosts to also
158    create the label.
159
160    @param id: id or name of a label. More often a label name.
161    @param hosts: A list of hostnames or ids. More often hostnames.
162    """
163    try:
164        label = models.Label.smart_get(id)
165    except models.Label.DoesNotExist:
166        # This matches the type checks in smart_get, which is a hack
167        # in and off itself. The aim here is to create any non-existent
168        # label, which we cannot do if the 'id' specified isn't a label name.
169        if isinstance(id, basestring):
170            label = models.Label.smart_get(add_label(id))
171        else:
172            raise ValueError('Label id (%s) does not exist. Please specify '
173                             'the argument, id, as a string (label name).'
174                             % id)
175
176    # Make sure the label exists on the shard with the same id
177    # as it is on the master.
178    # It is possible that the label is already in a shard because
179    # we are adding a new label only to shards of hosts that the label
180    # is going to be attached.
181    # For example, we add a label L1 to a host in shard S1.
182    # Master and S1 will have L1 but other shards won't.
183    # Later, when we add the same label L1 to hosts in shards S1 and S2,
184    # S1 already has the label but S2 doesn't.
185    # S2 should have the new label without any problem.
186    # We ignore exception in such a case.
187    host_objs = models.Host.smart_get_bulk(hosts)
188    rpc_utils.fanout_rpc(
189            host_objs, 'add_label', include_hostnames=False,
190            name=label.name, ignore_exception_if_exists=True,
191            id=label.id, platform=label.platform)
192
193
194@rpc_utils.route_rpc_to_master
195def label_add_hosts(id, hosts):
196    """Adds a label with the given id to the given hosts.
197
198    This method should be run only on master not shards.
199    The given label will be created if it doesn't exist, provided the `id`
200    supplied is a label name not an int/long id.
201
202    @param id: id or name of a label. More often a label name.
203    @param hosts: A list of hostnames or ids. More often hostnames.
204
205    @raises ValueError: If the id specified is an int/long (label id)
206                        while the label does not exist.
207    """
208    # Create the label.
209    _create_label_everywhere(id, hosts)
210
211    # Add it to the master.
212    add_label_to_hosts(id, hosts)
213
214    # Add it to the shards.
215    host_objs = models.Host.smart_get_bulk(hosts)
216    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
217
218
219def remove_label_from_hosts(id, hosts):
220    """Removes a label of the given id from the given hosts only in local DB.
221
222    @param id: id or name of a label.
223    @param hosts: The hostnames of hosts that need to remove the label from.
224    """
225    host_objs = models.Host.smart_get_bulk(hosts)
226    models.Label.smart_get(id).host_set.remove(*host_objs)
227
228
229@rpc_utils.route_rpc_to_master
230def label_remove_hosts(id, hosts):
231    """Removes a label of the given id from the given hosts.
232
233    This method should be run only on master not shards.
234
235    @param id: id or name of a label.
236    @param hosts: A list of hostnames or ids. More often hostnames.
237    """
238    host_objs = models.Host.smart_get_bulk(hosts)
239    remove_label_from_hosts(id, hosts)
240
241    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
242
243
244def get_labels(exclude_filters=(), **filter_data):
245    """\
246    @param exclude_filters: A sequence of dictionaries of filters.
247
248    @returns A sequence of nested dictionaries of label information.
249    """
250    labels = models.Label.query_objects(filter_data)
251    for exclude_filter in exclude_filters:
252        labels = labels.exclude(**exclude_filter)
253    return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',))
254
255
256# atomic groups
257
258def add_atomic_group(name, max_number_of_machines=None, description=None):
259    return models.AtomicGroup.add_object(
260            name=name, max_number_of_machines=max_number_of_machines,
261            description=description).id
262
263
264def modify_atomic_group(id, **data):
265    models.AtomicGroup.smart_get(id).update_object(data)
266
267
268def delete_atomic_group(id):
269    models.AtomicGroup.smart_get(id).delete()
270
271
272def atomic_group_add_labels(id, labels):
273    label_objs = models.Label.smart_get_bulk(labels)
274    models.AtomicGroup.smart_get(id).label_set.add(*label_objs)
275
276
277def atomic_group_remove_labels(id, labels):
278    label_objs = models.Label.smart_get_bulk(labels)
279    models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)
280
281
282def get_atomic_groups(**filter_data):
283    return rpc_utils.prepare_for_serialization(
284            models.AtomicGroup.list_objects(filter_data))
285
286
287# hosts
288
289def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
290    if locked and not lock_reason:
291        raise model_logic.ValidationError(
292            {'locked': 'Please provide a reason for locking when adding host.'})
293
294    return models.Host.add_object(hostname=hostname, status=status,
295                                  locked=locked, lock_reason=lock_reason,
296                                  protection=protection).id
297
298
299@rpc_utils.route_rpc_to_master
300def modify_host(id, **kwargs):
301    """Modify local attributes of a host.
302
303    If this is called on the master, but the host is assigned to a shard, this
304    will call `modify_host_local` RPC to the responsible shard. This means if
305    a host is being locked using this function, this change will also propagate
306    to shards.
307    When this is called on a shard, the shard just routes the RPC to the master
308    and does nothing.
309
310    @param id: id of the host to modify.
311    @param kwargs: key=value pairs of values to set on the host.
312    """
313    rpc_utils.check_modify_host(kwargs)
314    host = models.Host.smart_get(id)
315    try:
316        rpc_utils.check_modify_host_locking(host, kwargs)
317    except model_logic.ValidationError as e:
318        if not kwargs.get('force_modify_locking', False):
319            raise
320        logging.exception('The following exception will be ignored and lock '
321                          'modification will be enforced. %s', e)
322
323    # This is required to make `lock_time` for a host be exactly same
324    # between the master and a shard.
325    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
326        kwargs['lock_time'] = datetime.datetime.now()
327    host.update_object(kwargs)
328
329    # force_modifying_locking is not an internal field in database, remove.
330    kwargs.pop('force_modify_locking', None)
331    rpc_utils.fanout_rpc([host], 'modify_host_local',
332                         include_hostnames=False, id=id, **kwargs)
333
334
335def modify_host_local(id, **kwargs):
336    """Modify host attributes in local DB.
337
338    @param id: Host id.
339    @param kwargs: key=value pairs of values to set on the host.
340    """
341    models.Host.smart_get(id).update_object(kwargs)
342
343
344@rpc_utils.route_rpc_to_master
345def modify_hosts(host_filter_data, update_data):
346    """Modify local attributes of multiple hosts.
347
348    If this is called on the master, but one of the hosts in that match the
349    filters is assigned to a shard, this will call `modify_hosts_local` RPC
350    to the responsible shard.
351    When this is called on a shard, the shard just routes the RPC to the master
352    and does nothing.
353
354    The filters are always applied on the master, not on the shards. This means
355    if the states of a host differ on the master and a shard, the state on the
356    master will be used. I.e. this means:
357    A host was synced to Shard 1. On Shard 1 the status of the host was set to
358    'Repair Failed'.
359    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
360    update the host (both on the shard and on the master), because the state
361    of the host as the master knows it is still 'Ready'.
362    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
363    will not update the host, because the filter doesn't apply on the master.
364
365    @param host_filter_data: Filters out which hosts to modify.
366    @param update_data: A dictionary with the changes to make to the hosts.
367    """
368    update_data = update_data.copy()
369    rpc_utils.check_modify_host(update_data)
370    hosts = models.Host.query_objects(host_filter_data)
371
372    affected_shard_hostnames = set()
373    affected_host_ids = []
374
375    # Check all hosts before changing data for exception safety.
376    for host in hosts:
377        try:
378            rpc_utils.check_modify_host_locking(host, update_data)
379        except model_logic.ValidationError as e:
380            if not update_data.get('force_modify_locking', False):
381                raise
382            logging.exception('The following exception will be ignored and '
383                              'lock modification will be enforced. %s', e)
384
385        if host.shard:
386            affected_shard_hostnames.add(host.shard.rpc_hostname())
387            affected_host_ids.append(host.id)
388
389    # This is required to make `lock_time` for a host be exactly same
390    # between the master and a shard.
391    if update_data.get('locked', None) and 'lock_time' not in update_data:
392        update_data['lock_time'] = datetime.datetime.now()
393    for host in hosts:
394        host.update_object(update_data)
395
396    update_data.pop('force_modify_locking', None)
397    # Caution: Changing the filter from the original here. See docstring.
398    rpc_utils.run_rpc_on_multiple_hostnames(
399            'modify_hosts_local', affected_shard_hostnames,
400            host_filter_data={'id__in': affected_host_ids},
401            update_data=update_data)
402
403
404def modify_hosts_local(host_filter_data, update_data):
405    """Modify attributes of hosts in local DB.
406
407    @param host_filter_data: Filters out which hosts to modify.
408    @param update_data: A dictionary with the changes to make to the hosts.
409    """
410    for host in models.Host.query_objects(host_filter_data):
411        host.update_object(update_data)
412
413
414def add_labels_to_host(id, labels):
415    """Adds labels to a given host only in local DB.
416
417    @param id: id or hostname for a host.
418    @param labels: ids or names for labels.
419    """
420    label_objs = models.Label.smart_get_bulk(labels)
421    models.Host.smart_get(id).labels.add(*label_objs)
422
423
424@rpc_utils.route_rpc_to_master
425def host_add_labels(id, labels):
426    """Adds labels to a given host.
427
428    @param id: id or hostname for a host.
429    @param labels: ids or names for labels.
430
431    @raises ValidationError: If adding more than one platform label.
432    """
433    # Create the labels on the master/shards.
434    for label in labels:
435        _create_label_everywhere(label, [id])
436
437    label_objs = models.Label.smart_get_bulk(labels)
438    platforms = [label.name for label in label_objs if label.platform]
439    if len(platforms) > 1:
440        raise model_logic.ValidationError(
441            {'labels': 'Adding more than one platform label: %s' %
442                       ', '.join(platforms)})
443
444    host_obj = models.Host.smart_get(id)
445    if len(platforms) == 1:
446        models.Host.check_no_platform([host_obj])
447    add_labels_to_host(id, labels)
448
449    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
450                         id=id, labels=labels)
451
452
453def remove_labels_from_host(id, labels):
454    """Removes labels from a given host only in local DB.
455
456    @param id: id or hostname for a host.
457    @param labels: ids or names for labels.
458    """
459    label_objs = models.Label.smart_get_bulk(labels)
460    models.Host.smart_get(id).labels.remove(*label_objs)
461
462
463@rpc_utils.route_rpc_to_master
464def host_remove_labels(id, labels):
465    """Removes labels from a given host.
466
467    @param id: id or hostname for a host.
468    @param labels: ids or names for labels.
469    """
470    remove_labels_from_host(id, labels)
471
472    host_obj = models.Host.smart_get(id)
473    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
474                         id=id, labels=labels)
475
476
477def get_host_attribute(attribute, **host_filter_data):
478    """
479    @param attribute: string name of attribute
480    @param host_filter_data: filter data to apply to Hosts to choose hosts to
481                             act upon
482    """
483    hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data)
484    hosts = list(hosts)
485    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
486                                               'attribute_list')
487    host_attr_dicts = []
488    for host_obj in hosts:
489        for attr_obj in host_obj.attribute_list:
490            if attr_obj.attribute == attribute:
491                host_attr_dicts.append(attr_obj.get_object_dict())
492    return rpc_utils.prepare_for_serialization(host_attr_dicts)
493
494
495def set_host_attribute(attribute, value, **host_filter_data):
496    """
497    @param attribute: string name of attribute
498    @param value: string, or None to delete an attribute
499    @param host_filter_data: filter data to apply to Hosts to choose hosts to
500                             act upon
501    """
502    assert host_filter_data # disallow accidental actions on all hosts
503    hosts = models.Host.query_objects(host_filter_data)
504    models.AclGroup.check_for_acl_violation_hosts(hosts)
505    for host in hosts:
506        host.set_or_delete_attribute(attribute, value)
507
508    # Master forwards this RPC to shards.
509    if not utils.is_shard():
510        rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
511                attribute=attribute, value=value, **host_filter_data)
512
513
514@rpc_utils.forward_single_host_rpc_to_shard
515def delete_host(id):
516    models.Host.smart_get(id).delete()
517
518
519def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
520              exclude_atomic_group_hosts=False, valid_only=True,
521              include_current_job=False, **filter_data):
522    """Get a list of dictionaries which contains the information of hosts.
523
524    @param multiple_labels: match hosts in all of the labels given.  Should
525            be a list of label names.
526    @param exclude_only_if_needed_labels: Exclude hosts with at least one
527            "only_if_needed" label applied.
528    @param exclude_atomic_group_hosts: Exclude hosts that have one or more
529            atomic group labels associated with them.
530    @param include_current_job: Set to True to include ids of currently running
531            job and special task.
532    """
533    hosts = rpc_utils.get_host_query(multiple_labels,
534                                     exclude_only_if_needed_labels,
535                                     exclude_atomic_group_hosts,
536                                     valid_only, filter_data)
537    hosts = list(hosts)
538    models.Host.objects.populate_relationships(hosts, models.Label,
539                                               'label_list')
540    models.Host.objects.populate_relationships(hosts, models.AclGroup,
541                                               'acl_list')
542    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
543                                               'attribute_list')
544    host_dicts = []
545    for host_obj in hosts:
546        host_dict = host_obj.get_object_dict()
547        host_dict['labels'] = [label.name for label in host_obj.label_list]
548        host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.
549                find_platform_and_atomic_group(host_obj))
550        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
551        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
552                                       for attribute in host_obj.attribute_list)
553        if include_current_job:
554            host_dict['current_job'] = None
555            host_dict['current_special_task'] = None
556            entries = models.HostQueueEntry.objects.filter(
557                    host_id=host_dict['id'], active=True, complete=False)
558            if entries:
559                host_dict['current_job'] = (
560                        entries[0].get_object_dict()['job'])
561            tasks = models.SpecialTask.objects.filter(
562                    host_id=host_dict['id'], is_active=True, is_complete=False)
563            if tasks:
564                host_dict['current_special_task'] = (
565                        '%d-%s' % (tasks[0].get_object_dict()['id'],
566                                   tasks[0].get_object_dict()['task'].lower()))
567        host_dicts.append(host_dict)
568    return rpc_utils.prepare_for_serialization(host_dicts)
569
570
571def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
572                  exclude_atomic_group_hosts=False, valid_only=True,
573                  **filter_data):
574    """
575    Same parameters as get_hosts().
576
577    @returns The number of matching hosts.
578    """
579    hosts = rpc_utils.get_host_query(multiple_labels,
580                                     exclude_only_if_needed_labels,
581                                     exclude_atomic_group_hosts,
582                                     valid_only, filter_data)
583    return hosts.count()
584
585
586# tests
587
588def add_test(name, test_type, path, author=None, dependencies=None,
589             experimental=True, run_verify=None, test_class=None,
590             test_time=None, test_category=None, description=None,
591             sync_count=1):
592    return models.Test.add_object(name=name, test_type=test_type, path=path,
593                                  author=author, dependencies=dependencies,
594                                  experimental=experimental,
595                                  run_verify=run_verify, test_time=test_time,
596                                  test_category=test_category,
597                                  sync_count=sync_count,
598                                  test_class=test_class,
599                                  description=description).id
600
601
602def modify_test(id, **data):
603    models.Test.smart_get(id).update_object(data)
604
605
606def delete_test(id):
607    models.Test.smart_get(id).delete()
608
609
610def get_tests(**filter_data):
611    return rpc_utils.prepare_for_serialization(
612        models.Test.list_objects(filter_data))
613
614
615@_timer.decorate
616def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
617    """Gets the counts of all passed and failed tests from the matching jobs.
618
619    @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
620            'butterfly-release/R40-6457.21.0/bvt-cq/'.
621    @param label_name: Label that must be set in the jobs, e.g.,
622            'cros-version:butterfly-release/R40-6457.21.0'.
623
624    @returns A summary of the counts of all the passed and failed tests.
625    """
626    job_ids = list(models.Job.objects.filter(
627            name__startswith=job_name_prefix,
628            dependency_labels__name=label_name).values_list(
629                'pk', flat=True))
630    summary = {'passed': 0, 'failed': 0}
631    if not job_ids:
632        return summary
633
634    counts = (tko_models.TestView.objects.filter(
635            afe_job_id__in=job_ids).exclude(
636                test_name='SERVER_JOB').exclude(
637                    test_name__startswith='CLIENT_JOB').values(
638                        'status').annotate(
639                            count=Count('status')))
640    for status in counts:
641        if status['status'] == 'GOOD':
642            summary['passed'] += status['count']
643        else:
644            summary['failed'] += status['count']
645    return summary
646
647
648# profilers
649
650def add_profiler(name, description=None):
651    return models.Profiler.add_object(name=name, description=description).id
652
653
654def modify_profiler(id, **data):
655    models.Profiler.smart_get(id).update_object(data)
656
657
658def delete_profiler(id):
659    models.Profiler.smart_get(id).delete()
660
661
662def get_profilers(**filter_data):
663    return rpc_utils.prepare_for_serialization(
664        models.Profiler.list_objects(filter_data))
665
666
667# users
668
669def add_user(login, access_level=None):
670    return models.User.add_object(login=login, access_level=access_level).id
671
672
673def modify_user(id, **data):
674    models.User.smart_get(id).update_object(data)
675
676
677def delete_user(id):
678    models.User.smart_get(id).delete()
679
680
681def get_users(**filter_data):
682    return rpc_utils.prepare_for_serialization(
683        models.User.list_objects(filter_data))
684
685
686# acl groups
687
688def add_acl_group(name, description=None):
689    group = models.AclGroup.add_object(name=name, description=description)
690    group.users.add(models.User.current_user())
691    return group.id
692
693
694def modify_acl_group(id, **data):
695    group = models.AclGroup.smart_get(id)
696    group.check_for_acl_violation_acl_group()
697    group.update_object(data)
698    group.add_current_user_if_empty()
699
700
701def acl_group_add_users(id, users):
702    group = models.AclGroup.smart_get(id)
703    group.check_for_acl_violation_acl_group()
704    users = models.User.smart_get_bulk(users)
705    group.users.add(*users)
706
707
708def acl_group_remove_users(id, users):
709    group = models.AclGroup.smart_get(id)
710    group.check_for_acl_violation_acl_group()
711    users = models.User.smart_get_bulk(users)
712    group.users.remove(*users)
713    group.add_current_user_if_empty()
714
715
716def acl_group_add_hosts(id, hosts):
717    group = models.AclGroup.smart_get(id)
718    group.check_for_acl_violation_acl_group()
719    hosts = models.Host.smart_get_bulk(hosts)
720    group.hosts.add(*hosts)
721    group.on_host_membership_change()
722
723
724def acl_group_remove_hosts(id, hosts):
725    group = models.AclGroup.smart_get(id)
726    group.check_for_acl_violation_acl_group()
727    hosts = models.Host.smart_get_bulk(hosts)
728    group.hosts.remove(*hosts)
729    group.on_host_membership_change()
730
731
732def delete_acl_group(id):
733    models.AclGroup.smart_get(id).delete()
734
735
736def get_acl_groups(**filter_data):
737    acl_groups = models.AclGroup.list_objects(filter_data)
738    for acl_group in acl_groups:
739        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
740        acl_group['users'] = [user.login
741                              for user in acl_group_obj.users.all()]
742        acl_group['hosts'] = [host.hostname
743                              for host in acl_group_obj.hosts.all()]
744    return rpc_utils.prepare_for_serialization(acl_groups)
745
746
747# jobs
748
749def generate_control_file(tests=(), kernel=None, label=None, profilers=(),
750                          client_control_file='', use_container=False,
751                          profile_only=None, upload_kernel_config=False,
752                          db_tests=True):
753    """
754    Generates a client-side control file to load a kernel and run tests.
755
756    @param tests List of tests to run. See db_tests for more information.
757    @param kernel A list of kernel info dictionaries configuring which kernels
758        to boot for this job and other options for them
759    @param label Name of label to grab kernel config from.
760    @param profilers List of profilers to activate during the job.
761    @param client_control_file The contents of a client-side control file to
762        run at the end of all tests.  If this is supplied, all tests must be
763        client side.
764        TODO: in the future we should support server control files directly
765        to wrap with a kernel.  That'll require changing the parameter
766        name and adding a boolean to indicate if it is a client or server
767        control file.
768    @param use_container unused argument today.  TODO: Enable containers
769        on the host during a client side test.
770    @param profile_only A boolean that indicates what default profile_only
771        mode to use in the control file. Passing None will generate a
772        control file that does not explcitly set the default mode at all.
773    @param upload_kernel_config: if enabled it will generate server control
774            file code that uploads the kernel config file to the client and
775            tells the client of the new (local) path when compiling the kernel;
776            the tests must be server side tests
777    @param db_tests: if True, the test object can be found in the database
778                     backing the test model. In this case, tests is a tuple
779                     of test IDs which are used to retrieve the test objects
780                     from the database. If False, tests is a tuple of test
781                     dictionaries stored client-side in the AFE.
782
783    @returns a dict with the following keys:
784        control_file: str, The control file text.
785        is_server: bool, is the control file a server-side control file?
786        synch_count: How many machines the job uses per autoserv execution.
787            synch_count == 1 means the job is asynchronous.
788        dependencies: A list of the names of labels on which the job depends.
789    """
790    if not tests and not client_control_file:
791        return dict(control_file='', is_server=False, synch_count=1,
792                    dependencies=[])
793
794    cf_info, test_objects, profiler_objects, label = (
795        rpc_utils.prepare_generate_control_file(tests, kernel, label,
796                                                profilers, db_tests))
797    cf_info['control_file'] = control_file.generate_control(
798        tests=test_objects, kernels=kernel, platform=label,
799        profilers=profiler_objects, is_server=cf_info['is_server'],
800        client_control_file=client_control_file, profile_only=profile_only,
801        upload_kernel_config=upload_kernel_config)
802    return cf_info
803
804
805def create_parameterized_job(name, priority, test, parameters, kernel=None,
806                             label=None, profilers=(), profiler_parameters=None,
807                             use_container=False, profile_only=None,
808                             upload_kernel_config=False, hosts=(),
809                             meta_hosts=(), one_time_hosts=(),
810                             atomic_group_name=None, synch_count=None,
811                             is_template=False, timeout=None,
812                             timeout_mins=None, max_runtime_mins=None,
813                             run_verify=False, email_list='', dependencies=(),
814                             reboot_before=None, reboot_after=None,
815                             parse_failed_repair=None, hostless=False,
816                             keyvals=None, drone_set=None, run_reset=True,
817                             require_ssp=None):
818    """
819    Creates and enqueues a parameterized job.
820
821    Most parameters a combination of the parameters for generate_control_file()
822    and create_job(), with the exception of:
823
824    @param test name or ID of the test to run
825    @param parameters a map of parameter name ->
826                          tuple of (param value, param type)
827    @param profiler_parameters a dictionary of parameters for the profilers:
828                                   key: profiler name
829                                   value: dict of param name -> tuple of
830                                                                (param value,
831                                                                 param type)
832    """
833    # Save the values of the passed arguments here. What we're going to do with
834    # them is pass them all to rpc_utils.get_create_job_common_args(), which
835    # will extract the subset of these arguments that apply for
836    # rpc_utils.create_job_common(), which we then pass in to that function.
837    args = locals()
838
839    # Set up the parameterized job configs
840    test_obj = models.Test.smart_get(test)
841    control_type = test_obj.test_type
842
843    try:
844        label = models.Label.smart_get(label)
845    except models.Label.DoesNotExist:
846        label = None
847
848    kernel_objs = models.Kernel.create_kernels(kernel)
849    profiler_objs = [models.Profiler.smart_get(profiler)
850                     for profiler in profilers]
851
852    parameterized_job = models.ParameterizedJob.objects.create(
853            test=test_obj, label=label, use_container=use_container,
854            profile_only=profile_only,
855            upload_kernel_config=upload_kernel_config)
856    parameterized_job.kernels.add(*kernel_objs)
857
858    for profiler in profiler_objs:
859        parameterized_profiler = models.ParameterizedJobProfiler.objects.create(
860                parameterized_job=parameterized_job,
861                profiler=profiler)
862        profiler_params = profiler_parameters.get(profiler.name, {})
863        for name, (value, param_type) in profiler_params.iteritems():
864            models.ParameterizedJobProfilerParameter.objects.create(
865                    parameterized_job_profiler=parameterized_profiler,
866                    parameter_name=name,
867                    parameter_value=value,
868                    parameter_type=param_type)
869
870    try:
871        for parameter in test_obj.testparameter_set.all():
872            if parameter.name in parameters:
873                param_value, param_type = parameters.pop(parameter.name)
874                parameterized_job.parameterizedjobparameter_set.create(
875                        test_parameter=parameter, parameter_value=param_value,
876                        parameter_type=param_type)
877
878        if parameters:
879            raise Exception('Extra parameters remain: %r' % parameters)
880
881        return rpc_utils.create_job_common(
882                parameterized_job=parameterized_job.id,
883                control_type=control_type,
884                **rpc_utils.get_create_job_common_args(args))
885    except:
886        parameterized_job.delete()
887        raise
888
889
890def create_job_page_handler(name, priority, control_file, control_type,
891                            image=None, hostless=False, firmware_rw_build=None,
892                            firmware_ro_build=None, test_source_build=None,
893                            **kwargs):
894    """\
895    Create and enqueue a job.
896
897    @param name name of this job
898    @param priority Integer priority of this job.  Higher is more important.
899    @param control_file String contents of the control file.
900    @param control_type Type of control file, Client or Server.
901    @param image: ChromeOS build to be installed in the dut. Default to None.
902    @param firmware_rw_build: Firmware build to update RW firmware. Default to
903                              None, i.e., RW firmware will not be updated.
904    @param firmware_ro_build: Firmware build to update RO firmware. Default to
905                              None, i.e., RO firmware will not be updated.
906    @param test_source_build: Build to be used to retrieve test code. Default
907                              to None.
908    @param kwargs extra args that will be required by create_suite_job or
909                  create_job.
910
911    @returns The created Job id number.
912    """
913    control_file = rpc_utils.encode_ascii(control_file)
914    if not control_file:
915        raise model_logic.ValidationError({
916                'control_file' : "Control file cannot be empty"})
917
918    if image and hostless:
919        builds = {}
920        builds[provision.CROS_VERSION_PREFIX] = image
921        if firmware_rw_build:
922            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
923        if firmware_ro_build:
924            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
925        return site_rpc_interface.create_suite_job(
926                name=name, control_file=control_file, priority=priority,
927                builds=builds, test_source_build=test_source_build, **kwargs)
928    return create_job(name, priority, control_file, control_type, image=image,
929                      hostless=hostless, **kwargs)
930
931
932@rpc_utils.route_rpc_to_master
933def create_job(name, priority, control_file, control_type,
934               hosts=(), meta_hosts=(), one_time_hosts=(),
935               atomic_group_name=None, synch_count=None, is_template=False,
936               timeout=None, timeout_mins=None, max_runtime_mins=None,
937               run_verify=False, email_list='', dependencies=(),
938               reboot_before=None, reboot_after=None, parse_failed_repair=None,
939               hostless=False, keyvals=None, drone_set=None, image=None,
940               parent_job_id=None, test_retry=0, run_reset=True,
941               require_ssp=None, args=(), **kwargs):
942    """\
943    Create and enqueue a job.
944
945    @param name name of this job
946    @param priority Integer priority of this job.  Higher is more important.
947    @param control_file String contents of the control file.
948    @param control_type Type of control file, Client or Server.
949    @param synch_count How many machines the job uses per autoserv execution.
950        synch_count == 1 means the job is asynchronous.  If an atomic group is
951        given this value is treated as a minimum.
952    @param is_template If true then create a template job.
953    @param timeout Hours after this call returns until the job times out.
954    @param timeout_mins Minutes after this call returns until the job times
955        out.
956    @param max_runtime_mins Minutes from job starting time until job times out
957    @param run_verify Should the host be verified before running the test?
958    @param email_list String containing emails to mail when the job is done
959    @param dependencies List of label names on which this job depends
960    @param reboot_before Never, If dirty, or Always
961    @param reboot_after Never, If all tests passed, or Always
962    @param parse_failed_repair if true, results of failed repairs launched by
963        this job will be parsed as part of the job.
964    @param hostless if true, create a hostless job
965    @param keyvals dict of keyvals to associate with the job
966    @param hosts List of hosts to run job on.
967    @param meta_hosts List where each entry is a label name, and for each entry
968        one host will be chosen from that label to run the job on.
969    @param one_time_hosts List of hosts not in the database to run the job on.
970    @param atomic_group_name The name of an atomic group to schedule the job on.
971    @param drone_set The name of the drone set to run this test on.
972    @param image OS image to install before running job.
973    @param parent_job_id id of a job considered to be parent of created job.
974    @param test_retry Number of times to retry test if the test did not
975        complete successfully. (optional, default: 0)
976    @param run_reset Should the host be reset before running the test?
977    @param require_ssp Set to True to require server-side packaging to run the
978                       test. If it's set to None, drone will still try to run
979                       the server side with server-side packaging. If the
980                       autotest-server package doesn't exist for the build or
981                       image is not set, drone will run the test without server-
982                       side packaging. Default is None.
983    @param args A list of args to be injected into control file.
984    @param kwargs extra keyword args. NOT USED.
985
986    @returns The created Job id number.
987    """
988    if args:
989        control_file = tools.inject_vars({'args': args}, control_file)
990
991    if image is None:
992        return rpc_utils.create_job_common(
993                **rpc_utils.get_create_job_common_args(locals()))
994
995    # Translate the image name, in case its a relative build name.
996    ds = dev_server.ImageServer.resolve(image)
997    image = ds.translate(image)
998
999    # When image is supplied use a known parameterized test already in the
1000    # database to pass the OS image path from the front end, through the
1001    # scheduler, and finally to autoserv as the --image parameter.
1002
1003    # The test autoupdate_ParameterizedJob is in afe_autotests and used to
1004    # instantiate a Test object and from there a ParameterizedJob.
1005    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
1006    known_parameterized_job = models.ParameterizedJob.objects.create(
1007            test=known_test_obj)
1008
1009    # autoupdate_ParameterizedJob has a single parameter, the image parameter,
1010    # stored in the table afe_test_parameters.  We retrieve and set this
1011    # instance of the parameter to the OS image path.
1012    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
1013                                                           name='image')
1014    known_parameterized_job.parameterizedjobparameter_set.create(
1015            test_parameter=image_parameter, parameter_value=image,
1016            parameter_type='string')
1017
1018    # TODO(crbug.com/502638): save firmware build etc to parameterized_job.
1019
1020    # By passing a parameterized_job to create_job_common the job entry in
1021    # the afe_jobs table will have the field parameterized_job_id set.
1022    # The scheduler uses this id in the afe_parameterized_jobs table to
1023    # match this job to our known test, and then with the
1024    # afe_parameterized_job_parameters table to get the actual image path.
1025    return rpc_utils.create_job_common(
1026            parameterized_job=known_parameterized_job.id,
1027            **rpc_utils.get_create_job_common_args(locals()))
1028
1029
1030def abort_host_queue_entries(**filter_data):
1031    """\
1032    Abort a set of host queue entries.
1033
1034    @return: A list of dictionaries, each contains information
1035             about an aborted HQE.
1036    """
1037    query = models.HostQueueEntry.query_objects(filter_data)
1038
1039    # Dont allow aborts on:
1040    #   1. Jobs that have already completed (whether or not they were aborted)
1041    #   2. Jobs that we have already been aborted (but may not have completed)
1042    query = query.filter(complete=False).filter(aborted=False)
1043    models.AclGroup.check_abort_permissions(query)
1044    host_queue_entries = list(query.select_related())
1045    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
1046
1047    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
1048    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
1049                 'Job name': hqe.job.name} for hqe in host_queue_entries]
1050    return hqe_info
1051
1052
1053def abort_special_tasks(**filter_data):
1054    """\
1055    Abort the special task, or tasks, specified in the filter.
1056    """
1057    query = models.SpecialTask.query_objects(filter_data)
1058    special_tasks = query.filter(is_active=True)
1059    for task in special_tasks:
1060        task.abort()
1061
1062
1063def _call_special_tasks_on_hosts(task, hosts):
1064    """\
1065    Schedules a set of hosts for a special task.
1066
1067    @returns A list of hostnames that a special task was created for.
1068    """
1069    models.AclGroup.check_for_acl_violation_hosts(hosts)
1070    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1071    if shard_host_map and not utils.is_shard():
1072        raise ValueError('The following hosts are on shards, please '
1073                         'follow the link to the shards and create jobs '
1074                         'there instead. %s.' % shard_host_map)
1075    for host in hosts:
1076        models.SpecialTask.schedule_special_task(host, task)
1077    return list(sorted(host.hostname for host in hosts))
1078
1079
1080def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
1081    """Forward special tasks to corresponding shards.
1082
1083    For master, when special tasks are fired on hosts that are sharded,
1084    forward the RPC to corresponding shards.
1085
1086    For shard, create special task records in local DB.
1087
1088    @param task: Enum value of frontend.afe.models.SpecialTask.Task
1089    @param rpc: RPC name to forward.
1090    @param filter_data: Filter keywords to be used for DB query.
1091
1092    @return: A list of hostnames that a special task was created for.
1093    """
1094    hosts = models.Host.query_objects(filter_data)
1095    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
1096
1097    # Filter out hosts on a shard from those on the master, forward
1098    # rpcs to the shard with an additional hostname__in filter, and
1099    # create a local SpecialTask for each remaining host.
1100    if shard_host_map and not utils.is_shard():
1101        hosts = [h for h in hosts if h.shard is None]
1102        for shard, hostnames in shard_host_map.iteritems():
1103
1104            # The main client of this module is the frontend website, and
1105            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
1106            # the 'hostname' filter should narrow down the list of hosts on
1107            # each shard even though we supply all the ids in filter_data.
1108            # This method uses hostname instead of id because it fits better
1109            # with the overall architecture of redirection functions in
1110            # rpc_utils.
1111            shard_filter = filter_data.copy()
1112            shard_filter['hostname__in'] = hostnames
1113            rpc_utils.run_rpc_on_multiple_hostnames(
1114                    rpc, [shard], **shard_filter)
1115
1116    # There is a race condition here if someone assigns a shard to one of these
1117    # hosts before we create the task. The host will stay on the master if:
1118    # 1. The host is not Ready
1119    # 2. The host is Ready but has a task
1120    # But if the host is Ready and doesn't have a task yet, it will get sent
1121    # to the shard as we're creating a task here.
1122
1123    # Given that we only rarely verify Ready hosts it isn't worth putting this
1124    # entire method in a transaction. The worst case scenario is that we have
1125    # a verify running on a Ready host while the shard is using it, if the
1126    # verify fails no subsequent tasks will be created against the host on the
1127    # master, and verifies are safe enough that this is OK.
1128    return _call_special_tasks_on_hosts(task, hosts)
1129
1130
1131def reverify_hosts(**filter_data):
1132    """\
1133    Schedules a set of hosts for verify.
1134
1135    @returns A list of hostnames that a verify task was created for.
1136    """
1137    return _forward_special_tasks_on_hosts(
1138            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1139
1140
1141def repair_hosts(**filter_data):
1142    """\
1143    Schedules a set of hosts for repair.
1144
1145    @returns A list of hostnames that a repair task was created for.
1146    """
1147    return _forward_special_tasks_on_hosts(
1148            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1149
1150
1151def get_jobs(not_yet_run=False, running=False, finished=False,
1152             suite=False, sub=False, standalone=False, **filter_data):
1153    """\
1154    Extra status filter args for get_jobs:
1155    -not_yet_run: Include only jobs that have not yet started running.
1156    -running: Include only jobs that have start running but for which not
1157    all hosts have completed.
1158    -finished: Include only jobs for which all hosts have completed (or
1159    aborted).
1160
1161    Extra type filter args for get_jobs:
1162    -suite: Include only jobs with child jobs.
1163    -sub: Include only jobs with a parent job.
1164    -standalone: Inlcude only jobs with no child or parent jobs.
1165    At most one of these three fields should be specified.
1166    """
1167    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1168                                                    running,
1169                                                    finished)
1170    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1171                                                                 suite,
1172                                                                 sub,
1173                                                                 standalone)
1174    job_dicts = []
1175    jobs = list(models.Job.query_objects(filter_data))
1176    models.Job.objects.populate_relationships(jobs, models.Label,
1177                                              'dependencies')
1178    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1179    for job in jobs:
1180        job_dict = job.get_object_dict()
1181        job_dict['dependencies'] = ','.join(label.name
1182                                            for label in job.dependencies)
1183        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1184                                   for keyval in job.keyvals)
1185        if job.parameterized_job:
1186            job_dict['image'] = get_parameterized_autoupdate_image_url(job)
1187        job_dicts.append(job_dict)
1188    return rpc_utils.prepare_for_serialization(job_dicts)
1189
1190
1191def get_num_jobs(not_yet_run=False, running=False, finished=False,
1192                 suite=False, sub=False, standalone=False,
1193                 **filter_data):
1194    """\
1195    See get_jobs() for documentation of extra filter parameters.
1196    """
1197    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1198                                                    running,
1199                                                    finished)
1200    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1201                                                                 suite,
1202                                                                 sub,
1203                                                                 standalone)
1204    return models.Job.query_count(filter_data)
1205
1206
1207def get_jobs_summary(**filter_data):
1208    """\
1209    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1210
1211    'status_counts' filed is a dictionary mapping status strings to the number
1212    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1213
1214    'result_counts' field is piped to tko's rpc_interface and has the return
1215    format specified under get_group_counts.
1216    """
1217    jobs = get_jobs(**filter_data)
1218    ids = [job['id'] for job in jobs]
1219    all_status_counts = models.Job.objects.get_status_counts(ids)
1220    for job in jobs:
1221        job['status_counts'] = all_status_counts[job['id']]
1222        job['result_counts'] = tko_rpc_interface.get_status_counts(
1223                ['afe_job_id', 'afe_job_id'],
1224                header_groups=[['afe_job_id'], ['afe_job_id']],
1225                **{'afe_job_id': job['id']})
1226    return rpc_utils.prepare_for_serialization(jobs)
1227
1228
1229def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1230    """\
1231    Retrieves all the information needed to clone a job.
1232    """
1233    job = models.Job.objects.get(id=id)
1234    job_info = rpc_utils.get_job_info(job,
1235                                      preserve_metahosts,
1236                                      queue_entry_filter_data)
1237
1238    host_dicts = []
1239    for host in job_info['hosts']:
1240        host_dict = get_hosts(id=host.id)[0]
1241        other_labels = host_dict['labels']
1242        if host_dict['platform']:
1243            other_labels.remove(host_dict['platform'])
1244        host_dict['other_labels'] = ', '.join(other_labels)
1245        host_dicts.append(host_dict)
1246
1247    for host in job_info['one_time_hosts']:
1248        host_dict = dict(hostname=host.hostname,
1249                         id=host.id,
1250                         platform='(one-time host)',
1251                         locked_text='')
1252        host_dicts.append(host_dict)
1253
1254    # convert keys from Label objects to strings (names of labels)
1255    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1256                            in job_info['meta_host_counts'].iteritems())
1257
1258    info = dict(job=job.get_object_dict(),
1259                meta_host_counts=meta_host_counts,
1260                hosts=host_dicts)
1261    info['job']['dependencies'] = job_info['dependencies']
1262    if job_info['atomic_group']:
1263        info['atomic_group_name'] = (job_info['atomic_group']).name
1264    else:
1265        info['atomic_group_name'] = None
1266    info['hostless'] = job_info['hostless']
1267    info['drone_set'] = job.drone_set and job.drone_set.name
1268
1269    if job.parameterized_job:
1270        info['job']['image'] = get_parameterized_autoupdate_image_url(job)
1271
1272    return rpc_utils.prepare_for_serialization(info)
1273
1274
1275# host queue entries
1276
1277def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1278    """\
1279    @returns A sequence of nested dictionaries of host and job information.
1280    """
1281    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1282                                                   'started_on__lte',
1283                                                   start_time,
1284                                                   end_time,
1285                                                   **filter_data)
1286    return rpc_utils.prepare_rows_as_nested_dicts(
1287            models.HostQueueEntry.query_objects(filter_data),
1288            ('host', 'atomic_group', 'job'))
1289
1290
1291def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1292    """\
1293    Get the number of host queue entries associated with this job.
1294    """
1295    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1296                                                   'started_on__lte',
1297                                                   start_time,
1298                                                   end_time,
1299                                                   **filter_data)
1300    return models.HostQueueEntry.query_count(filter_data)
1301
1302
1303def get_hqe_percentage_complete(**filter_data):
1304    """
1305    Computes the fraction of host queue entries matching the given filter data
1306    that are complete.
1307    """
1308    query = models.HostQueueEntry.query_objects(filter_data)
1309    complete_count = query.filter(complete=True).count()
1310    total_count = query.count()
1311    if total_count == 0:
1312        return 1
1313    return float(complete_count) / total_count
1314
1315
1316# special tasks
1317
1318def get_special_tasks(**filter_data):
1319    """Get special task entries from the local database.
1320
1321    Query the special tasks table for tasks matching the given
1322    `filter_data`, and return a list of the results.  No attempt is
1323    made to forward the call to shards; the buck will stop here.
1324    The caller is expected to know the target shard for such reasons
1325    as:
1326      * The caller is a service (such as gs_offloader) configured
1327        to operate on behalf of one specific shard, and no other.
1328      * The caller has a host as a parameter, and knows that this is
1329        the shard assigned to that host.
1330
1331    @param filter_data  Filter keywords to pass to the underlying
1332                        database query.
1333
1334    """
1335    return rpc_utils.prepare_rows_as_nested_dicts(
1336            models.SpecialTask.query_objects(filter_data),
1337            ('host', 'queue_entry'))
1338
1339
1340def get_host_special_tasks(host_id, **filter_data):
1341    """Get special task entries for a given host.
1342
1343    Query the special tasks table for tasks that ran on the host
1344    given by `host_id` and matching the given `filter_data`.
1345    Return a list of the results.  If the host is assigned to a
1346    shard, forward this call to that shard.
1347
1348    @param host_id      Id in the database of the target host.
1349    @param filter_data  Filter keywords to pass to the underlying
1350                        database query.
1351
1352    """
1353    # Retrieve host data even if the host is in an invalid state.
1354    host = models.Host.smart_get(host_id, False)
1355    if not host.shard:
1356        return get_special_tasks(host_id=host_id, **filter_data)
1357    else:
1358        # The return values from AFE methods are post-processed
1359        # objects that aren't JSON-serializable.  So, we have to
1360        # call AFE.run() to get the raw, serializable output from
1361        # the shard.
1362        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1363        return shard_afe.run('get_special_tasks',
1364                             host_id=host_id, **filter_data)
1365
1366
1367def get_num_special_tasks(**kwargs):
1368    """Get the number of special task entries from the local database.
1369
1370    Query the special tasks table for tasks matching the given 'kwargs',
1371    and return the number of the results. No attempt is made to forward
1372    the call to shards; the buck will stop here.
1373
1374    @param kwargs    Filter keywords to pass to the underlying database query.
1375
1376    """
1377    return models.SpecialTask.query_count(kwargs)
1378
1379
1380def get_host_num_special_tasks(host, **kwargs):
1381    """Get special task entries for a given host.
1382
1383    Query the special tasks table for tasks that ran on the host
1384    given by 'host' and matching the given 'kwargs'.
1385    Return a list of the results.  If the host is assigned to a
1386    shard, forward this call to that shard.
1387
1388    @param host      id or name of a host. More often a hostname.
1389    @param kwargs    Filter keywords to pass to the underlying database query.
1390
1391    """
1392    # Retrieve host data even if the host is in an invalid state.
1393    host_model = models.Host.smart_get(host, False)
1394    if not host_model.shard:
1395        return get_num_special_tasks(host=host, **kwargs)
1396    else:
1397        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
1398        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1399
1400
1401def get_status_task(host_id, end_time):
1402    """Get the "status task" for a host from the local shard.
1403
1404    Returns a single special task representing the given host's
1405    "status task".  The status task is a completed special task that
1406    identifies whether the corresponding host was working or broken
1407    when it completed.  A successful task indicates a working host;
1408    a failed task indicates broken.
1409
1410    This call will not be forward to a shard; the receiving server
1411    must be the shard that owns the host.
1412
1413    @param host_id      Id in the database of the target host.
1414    @param end_time     Time reference for the host's status.
1415
1416    @return A single task; its status (successful or not)
1417            corresponds to the status of the host (working or
1418            broken) at the given time.  If no task is found, return
1419            `None`.
1420
1421    """
1422    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1423            status_history.get_status_task(host_id, end_time),
1424            ('host', 'queue_entry'))
1425    return tasklist[0] if tasklist else None
1426
1427
1428def get_host_status_task(host_id, end_time):
1429    """Get the "status task" for a host from its owning shard.
1430
1431    Finds the given host's owning shard, and forwards to it a call
1432    to `get_status_task()` (see above).
1433
1434    @param host_id      Id in the database of the target host.
1435    @param end_time     Time reference for the host's status.
1436
1437    @return A single task; its status (successful or not)
1438            corresponds to the status of the host (working or
1439            broken) at the given time.  If no task is found, return
1440            `None`.
1441
1442    """
1443    host = models.Host.smart_get(host_id)
1444    if not host.shard:
1445        return get_status_task(host_id, end_time)
1446    else:
1447        # The return values from AFE methods are post-processed
1448        # objects that aren't JSON-serializable.  So, we have to
1449        # call AFE.run() to get the raw, serializable output from
1450        # the shard.
1451        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1452        return shard_afe.run('get_status_task',
1453                             host_id=host_id, end_time=end_time)
1454
1455
1456def get_host_diagnosis_interval(host_id, end_time, success):
1457    """Find a "diagnosis interval" for a given host.
1458
1459    A "diagnosis interval" identifies a start and end time where
1460    the host went from "working" to "broken", or vice versa.  The
1461    interval's starting time is the starting time of the last status
1462    task with the old status; the end time is the finish time of the
1463    first status task with the new status.
1464
1465    This routine finds the most recent diagnosis interval for the
1466    given host prior to `end_time`, with a starting status matching
1467    `success`.  If `success` is true, the interval will start with a
1468    successful status task; if false the interval will start with a
1469    failed status task.
1470
1471    @param host_id      Id in the database of the target host.
1472    @param end_time     Time reference for the diagnosis interval.
1473    @param success      Whether the diagnosis interval should start
1474                        with a successful or failed status task.
1475
1476    @return A list of two strings.  The first is the timestamp for
1477            the beginning of the interval; the second is the
1478            timestamp for the end.  If the host has never changed
1479            state, the list is empty.
1480
1481    """
1482    host = models.Host.smart_get(host_id)
1483    if not host.shard or utils.is_shard():
1484        return status_history.get_diagnosis_interval(
1485                host_id, end_time, success)
1486    else:
1487        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1488        return shard_afe.get_host_diagnosis_interval(
1489                host_id, end_time, success)
1490
1491
1492# support for host detail view
1493
1494def get_host_queue_entries_and_special_tasks(host, query_start=None,
1495                                             query_limit=None, start_time=None,
1496                                             end_time=None):
1497    """
1498    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1499            in approximate run order.  each dict contains keys for type, host,
1500            job, status, started_on, execution_path, and ID.
1501    """
1502    total_limit = None
1503    if query_limit is not None:
1504        total_limit = query_start + query_limit
1505    filter_data_common = {'host': host,
1506                          'query_limit': total_limit,
1507                          'sort_by': ['-id']}
1508
1509    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1510            'time_started__gte', 'time_started__lte', start_time, end_time,
1511            **filter_data_common)
1512
1513    queue_entries = get_host_queue_entries(
1514            start_time, end_time, **filter_data_common)
1515    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1516
1517    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1518                                                       special_tasks)
1519    if query_start is not None:
1520        interleaved_entries = interleaved_entries[query_start:]
1521    if query_limit is not None:
1522        interleaved_entries = interleaved_entries[:query_limit]
1523    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1524            interleaved_entries, queue_entries)
1525
1526
1527def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1528                                                 end_time=None):
1529    filter_data_common = {'host': host}
1530
1531    filter_data_queue_entries, filter_data_special_tasks = (
1532            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1533                    filter_data_common, start_time, end_time))
1534
1535    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1536            + get_host_num_special_tasks(**filter_data_special_tasks))
1537
1538
1539# recurring run
1540
1541def get_recurring(**filter_data):
1542    return rpc_utils.prepare_rows_as_nested_dicts(
1543            models.RecurringRun.query_objects(filter_data),
1544            ('job', 'owner'))
1545
1546
1547def get_num_recurring(**filter_data):
1548    return models.RecurringRun.query_count(filter_data)
1549
1550
1551def delete_recurring_runs(**filter_data):
1552    to_delete = models.RecurringRun.query_objects(filter_data)
1553    to_delete.delete()
1554
1555
1556def create_recurring_run(job_id, start_date, loop_period, loop_count):
1557    owner = models.User.current_user().login
1558    job = models.Job.objects.get(id=job_id)
1559    return job.create_recurring_job(start_date=start_date,
1560                                    loop_period=loop_period,
1561                                    loop_count=loop_count,
1562                                    owner=owner)
1563
1564
1565# other
1566
1567def echo(data=""):
1568    """\
1569    Returns a passed in string. For doing a basic test to see if RPC calls
1570    can successfully be made.
1571    """
1572    return data
1573
1574
1575def get_motd():
1576    """\
1577    Returns the message of the day as a string.
1578    """
1579    return rpc_utils.get_motd()
1580
1581
1582def get_static_data():
1583    """\
1584    Returns a dictionary containing a bunch of data that shouldn't change
1585    often and is otherwise inaccessible.  This includes:
1586
1587    priorities: List of job priority choices.
1588    default_priority: Default priority value for new jobs.
1589    users: Sorted list of all users.
1590    labels: Sorted list of labels not start with 'cros-version' and
1591            'fw-version'.
1592    atomic_groups: Sorted list of all atomic groups.
1593    tests: Sorted list of all tests.
1594    profilers: Sorted list of all profilers.
1595    current_user: Logged-in username.
1596    host_statuses: Sorted list of possible Host statuses.
1597    job_statuses: Sorted list of possible HostQueueEntry statuses.
1598    job_timeout_default: The default job timeout length in minutes.
1599    parse_failed_repair_default: Default value for the parse_failed_repair job
1600            option.
1601    reboot_before_options: A list of valid RebootBefore string enums.
1602    reboot_after_options: A list of valid RebootAfter string enums.
1603    motd: Server's message of the day.
1604    status_dictionary: A mapping from one word job status names to a more
1605            informative description.
1606    """
1607
1608    job_fields = models.Job.get_field_dict()
1609    default_drone_set_name = models.DroneSet.default_drone_set_name()
1610    drone_sets = ([default_drone_set_name] +
1611                  sorted(drone_set.name for drone_set in
1612                         models.DroneSet.objects.exclude(
1613                                 name=default_drone_set_name)))
1614
1615    result = {}
1616    result['priorities'] = priorities.Priority.choices()
1617    default_priority = priorities.Priority.DEFAULT
1618    result['default_priority'] = 'Default'
1619    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1620    result['users'] = get_users(sort_by=['login'])
1621
1622    label_exclude_filters = [{'name__startswith': 'cros-version'},
1623                             {'name__startswith': 'fw-version'},
1624                             {'name__startswith': 'fwrw-version'},
1625                             {'name__startswith': 'fwro-version'},
1626                             {'name__startswith': 'ab-version'},
1627                             {'name__startswith': 'testbed-version'}]
1628    result['labels'] = get_labels(
1629        label_exclude_filters,
1630        sort_by=['-platform', 'name'])
1631
1632    result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
1633    result['tests'] = get_tests(sort_by=['name'])
1634    result['profilers'] = get_profilers(sort_by=['name'])
1635    result['current_user'] = rpc_utils.prepare_for_serialization(
1636        models.User.current_user().get_object_dict())
1637    result['host_statuses'] = sorted(models.Host.Status.names)
1638    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1639    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1640    result['job_max_runtime_mins_default'] = (
1641        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1642    result['parse_failed_repair_default'] = bool(
1643        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1644    result['reboot_before_options'] = model_attributes.RebootBefore.names
1645    result['reboot_after_options'] = model_attributes.RebootAfter.names
1646    result['motd'] = rpc_utils.get_motd()
1647    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1648    result['drone_sets'] = drone_sets
1649    result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()
1650
1651    result['status_dictionary'] = {"Aborted": "Aborted",
1652                                   "Verifying": "Verifying Host",
1653                                   "Provisioning": "Provisioning Host",
1654                                   "Pending": "Waiting on other hosts",
1655                                   "Running": "Running autoserv",
1656                                   "Completed": "Autoserv completed",
1657                                   "Failed": "Failed to complete",
1658                                   "Queued": "Queued",
1659                                   "Starting": "Next in host's queue",
1660                                   "Stopped": "Other host(s) failed verify",
1661                                   "Parsing": "Awaiting parse of final results",
1662                                   "Gathering": "Gathering log files",
1663                                   "Template": "Template job for recurring run",
1664                                   "Waiting": "Waiting for scheduler action",
1665                                   "Archiving": "Archiving results",
1666                                   "Resetting": "Resetting hosts"}
1667
1668    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1669    result['is_moblab'] = bool(utils.is_moblab())
1670
1671    return result
1672
1673
1674def get_server_time():
1675    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1676