rpc_interface.py revision b5b8b4f981036971c619b26956c8847140eabd35
1# pylint: disable-msg=C0111
2
3"""\
4Functions to expose over the RPC interface.
5
6For all modify* and delete* functions that ask for an 'id' parameter to
7identify the object to operate on, the id may be either
8 * the database row ID
9 * the name of the object (label name, hostname, user login, etc.)
10 * a dictionary containing uniquely identifying field (this option should seldom
11   be used)
12
13When specifying foreign key fields (i.e. adding hosts to a label, or adding
14users to an ACL group), the given value may be either the database row ID or the
15name of the object.
16
17All get* functions return lists of dictionaries.  Each dictionary represents one
18object and maps field names to values.
19
20Some examples:
21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2
22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2'
23modify_test('sleeptest', test_type='Client', params=', seconds=60')
24delete_acl_group(1) # delete by ID
25delete_acl_group('Everyone') # delete by name
26acl_group_add_users('Everyone', ['mbligh', 'showard'])
27get_jobs(owner='showard', status='Queued')
28
29See doctests/001_rpc_test.txt for (lots) more examples.
30"""
31
32__author__ = 'showard@google.com (Steve Howard)'
33
34import ast
35import datetime
36import logging
37import sys
38
39from django.db.models import Count
40import common
41from autotest_lib.client.common_lib import control_data
42from autotest_lib.client.common_lib import priorities
43from autotest_lib.client.common_lib.cros import dev_server
44from autotest_lib.client.common_lib.cros.graphite import autotest_stats
45from autotest_lib.frontend.afe import control_file, rpc_utils
46from autotest_lib.frontend.afe import models, model_logic, model_attributes
47from autotest_lib.frontend.afe import site_rpc_interface
48from autotest_lib.frontend.tko import models as tko_models
49from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
50from autotest_lib.server import frontend
51from autotest_lib.server import utils
52from autotest_lib.server.cros import provision
53from autotest_lib.server.cros.dynamic_suite import tools
54from autotest_lib.server.lib import status_history
55
56
57_timer = autotest_stats.Timer('rpc_interface')
58
59def get_parameterized_autoupdate_image_url(job):
60    """Get the parameterized autoupdate image url from a parameterized job."""
61    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
62    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
63                                                           name='image')
64    para_set = job.parameterized_job.parameterizedjobparameter_set
65    job_test_para = para_set.get(test_parameter=image_parameter)
66    return job_test_para.parameter_value
67
68
69# labels
70
71def modify_label(id, **data):
72    """Modify a label.
73
74    @param id: id or name of a label. More often a label name.
75    @param data: New data for a label.
76    """
77    label_model = models.Label.smart_get(id)
78    label_model.update_object(data)
79
80    # Master forwards the RPC to shards
81    if not utils.is_shard():
82        rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False,
83                             id=id, **data)
84
85
86def delete_label(id):
87    """Delete a label.
88
89    @param id: id or name of a label. More often a label name.
90    """
91    label_model = models.Label.smart_get(id)
92    # Hosts that have the label to be deleted. Save this info before
93    # the label is deleted to use it later.
94    hosts = []
95    for h in label_model.host_set.all():
96        hosts.append(models.Host.smart_get(h.id))
97    label_model.delete()
98
99    # Master forwards the RPC to shards
100    if not utils.is_shard():
101        rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id)
102
103
104def add_label(name, ignore_exception_if_exists=False, **kwargs):
105    """Adds a new label of a given name.
106
107    @param name: label name.
108    @param ignore_exception_if_exists: If True and the exception was
109        thrown due to the duplicated label name when adding a label,
110        then suppress the exception. Default is False.
111    @param kwargs: keyword args that store more info about a label
112        other than the name.
113    @return: int/long id of a new label.
114    """
115    # models.Label.add_object() throws model_logic.ValidationError
116    # when it is given a label name that already exists.
117    # However, ValidationError can be thrown with different errors,
118    # and those errors should be thrown up to the call chain.
119    try:
120        label = models.Label.add_object(name=name, **kwargs)
121    except:
122        exc_info = sys.exc_info()
123        if ignore_exception_if_exists:
124            label = rpc_utils.get_label(name)
125            # If the exception is raised not because of duplicated
126            # "name", then raise the original exception.
127            if label is None:
128                raise exc_info[0], exc_info[1], exc_info[2]
129        else:
130            raise exc_info[0], exc_info[1], exc_info[2]
131    return label.id
132
133
134def add_label_to_hosts(id, hosts):
135    """Adds a label of the given id to the given hosts only in local DB.
136
137    @param id: id or name of a label. More often a label name.
138    @param hosts: The hostnames of hosts that need the label.
139
140    @raises models.Label.DoesNotExist: If the label with id doesn't exist.
141    """
142    label = models.Label.smart_get(id)
143    host_objs = models.Host.smart_get_bulk(hosts)
144    if label.platform:
145        models.Host.check_no_platform(host_objs)
146    # Ensure a host has no more than one board label with it.
147    if label.name.startswith('board:'):
148        models.Host.check_board_labels_allowed(host_objs, [label.name])
149    label.host_set.add(*host_objs)
150
151
152def _create_label_everywhere(id, hosts):
153    """
154    Yet another method to create labels.
155
156    ALERT! This method should be run only on master not shards!
157    DO NOT RUN THIS ON A SHARD!!!  Deputies will hate you if you do!!!
158
159    This method exists primarily to serve label_add_hosts() and
160    host_add_labels().  Basically it pulls out the label check/add logic
161    from label_add_hosts() into this nice method that not only creates
162    the label but also tells the shards that service the hosts to also
163    create the label.
164
165    @param id: id or name of a label. More often a label name.
166    @param hosts: A list of hostnames or ids. More often hostnames.
167    """
168    try:
169        label = models.Label.smart_get(id)
170    except models.Label.DoesNotExist:
171        # This matches the type checks in smart_get, which is a hack
172        # in and off itself. The aim here is to create any non-existent
173        # label, which we cannot do if the 'id' specified isn't a label name.
174        if isinstance(id, basestring):
175            label = models.Label.smart_get(add_label(id))
176        else:
177            raise ValueError('Label id (%s) does not exist. Please specify '
178                             'the argument, id, as a string (label name).'
179                             % id)
180
181    # Make sure the label exists on the shard with the same id
182    # as it is on the master.
183    # It is possible that the label is already in a shard because
184    # we are adding a new label only to shards of hosts that the label
185    # is going to be attached.
186    # For example, we add a label L1 to a host in shard S1.
187    # Master and S1 will have L1 but other shards won't.
188    # Later, when we add the same label L1 to hosts in shards S1 and S2,
189    # S1 already has the label but S2 doesn't.
190    # S2 should have the new label without any problem.
191    # We ignore exception in such a case.
192    host_objs = models.Host.smart_get_bulk(hosts)
193    rpc_utils.fanout_rpc(
194            host_objs, 'add_label', include_hostnames=False,
195            name=label.name, ignore_exception_if_exists=True,
196            id=label.id, platform=label.platform)
197
198
199@rpc_utils.route_rpc_to_master
200def label_add_hosts(id, hosts):
201    """Adds a label with the given id to the given hosts.
202
203    This method should be run only on master not shards.
204    The given label will be created if it doesn't exist, provided the `id`
205    supplied is a label name not an int/long id.
206
207    @param id: id or name of a label. More often a label name.
208    @param hosts: A list of hostnames or ids. More often hostnames.
209
210    @raises ValueError: If the id specified is an int/long (label id)
211                        while the label does not exist.
212    """
213    # Create the label.
214    _create_label_everywhere(id, hosts)
215
216    # Add it to the master.
217    add_label_to_hosts(id, hosts)
218
219    # Add it to the shards.
220    host_objs = models.Host.smart_get_bulk(hosts)
221    rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id)
222
223
224def remove_label_from_hosts(id, hosts):
225    """Removes a label of the given id from the given hosts only in local DB.
226
227    @param id: id or name of a label.
228    @param hosts: The hostnames of hosts that need to remove the label from.
229    """
230    host_objs = models.Host.smart_get_bulk(hosts)
231    models.Label.smart_get(id).host_set.remove(*host_objs)
232
233
234@rpc_utils.route_rpc_to_master
235def label_remove_hosts(id, hosts):
236    """Removes a label of the given id from the given hosts.
237
238    This method should be run only on master not shards.
239
240    @param id: id or name of a label.
241    @param hosts: A list of hostnames or ids. More often hostnames.
242    """
243    host_objs = models.Host.smart_get_bulk(hosts)
244    remove_label_from_hosts(id, hosts)
245
246    rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id)
247
248
249def get_labels(exclude_filters=(), **filter_data):
250    """\
251    @param exclude_filters: A sequence of dictionaries of filters.
252
253    @returns A sequence of nested dictionaries of label information.
254    """
255    labels = models.Label.query_objects(filter_data)
256    for exclude_filter in exclude_filters:
257        labels = labels.exclude(**exclude_filter)
258    return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',))
259
260
261# atomic groups
262
263def add_atomic_group(name, max_number_of_machines=None, description=None):
264    return models.AtomicGroup.add_object(
265            name=name, max_number_of_machines=max_number_of_machines,
266            description=description).id
267
268
269def modify_atomic_group(id, **data):
270    models.AtomicGroup.smart_get(id).update_object(data)
271
272
273def delete_atomic_group(id):
274    models.AtomicGroup.smart_get(id).delete()
275
276
277def atomic_group_add_labels(id, labels):
278    label_objs = models.Label.smart_get_bulk(labels)
279    models.AtomicGroup.smart_get(id).label_set.add(*label_objs)
280
281
282def atomic_group_remove_labels(id, labels):
283    label_objs = models.Label.smart_get_bulk(labels)
284    models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)
285
286
287def get_atomic_groups(**filter_data):
288    return rpc_utils.prepare_for_serialization(
289            models.AtomicGroup.list_objects(filter_data))
290
291
292# hosts
293
294def add_host(hostname, status=None, locked=None, lock_reason='', protection=None):
295    if locked and not lock_reason:
296        raise model_logic.ValidationError(
297            {'locked': 'Please provide a reason for locking when adding host.'})
298
299    return models.Host.add_object(hostname=hostname, status=status,
300                                  locked=locked, lock_reason=lock_reason,
301                                  protection=protection).id
302
303
304@rpc_utils.route_rpc_to_master
305def modify_host(id, **kwargs):
306    """Modify local attributes of a host.
307
308    If this is called on the master, but the host is assigned to a shard, this
309    will call `modify_host_local` RPC to the responsible shard. This means if
310    a host is being locked using this function, this change will also propagate
311    to shards.
312    When this is called on a shard, the shard just routes the RPC to the master
313    and does nothing.
314
315    @param id: id of the host to modify.
316    @param kwargs: key=value pairs of values to set on the host.
317    """
318    rpc_utils.check_modify_host(kwargs)
319    host = models.Host.smart_get(id)
320    try:
321        rpc_utils.check_modify_host_locking(host, kwargs)
322    except model_logic.ValidationError as e:
323        if not kwargs.get('force_modify_locking', False):
324            raise
325        logging.exception('The following exception will be ignored and lock '
326                          'modification will be enforced. %s', e)
327
328    # This is required to make `lock_time` for a host be exactly same
329    # between the master and a shard.
330    if kwargs.get('locked', None) and 'lock_time' not in kwargs:
331        kwargs['lock_time'] = datetime.datetime.now()
332    host.update_object(kwargs)
333
334    # force_modifying_locking is not an internal field in database, remove.
335    kwargs.pop('force_modify_locking', None)
336    rpc_utils.fanout_rpc([host], 'modify_host_local',
337                         include_hostnames=False, id=id, **kwargs)
338
339
340def modify_host_local(id, **kwargs):
341    """Modify host attributes in local DB.
342
343    @param id: Host id.
344    @param kwargs: key=value pairs of values to set on the host.
345    """
346    models.Host.smart_get(id).update_object(kwargs)
347
348
349@rpc_utils.route_rpc_to_master
350def modify_hosts(host_filter_data, update_data):
351    """Modify local attributes of multiple hosts.
352
353    If this is called on the master, but one of the hosts in that match the
354    filters is assigned to a shard, this will call `modify_hosts_local` RPC
355    to the responsible shard.
356    When this is called on a shard, the shard just routes the RPC to the master
357    and does nothing.
358
359    The filters are always applied on the master, not on the shards. This means
360    if the states of a host differ on the master and a shard, the state on the
361    master will be used. I.e. this means:
362    A host was synced to Shard 1. On Shard 1 the status of the host was set to
363    'Repair Failed'.
364    - A call to modify_hosts with host_filter_data={'status': 'Ready'} will
365    update the host (both on the shard and on the master), because the state
366    of the host as the master knows it is still 'Ready'.
367    - A call to modify_hosts with host_filter_data={'status': 'Repair failed'
368    will not update the host, because the filter doesn't apply on the master.
369
370    @param host_filter_data: Filters out which hosts to modify.
371    @param update_data: A dictionary with the changes to make to the hosts.
372    """
373    update_data = update_data.copy()
374    rpc_utils.check_modify_host(update_data)
375    hosts = models.Host.query_objects(host_filter_data)
376
377    affected_shard_hostnames = set()
378    affected_host_ids = []
379
380    # Check all hosts before changing data for exception safety.
381    for host in hosts:
382        try:
383            rpc_utils.check_modify_host_locking(host, update_data)
384        except model_logic.ValidationError as e:
385            if not update_data.get('force_modify_locking', False):
386                raise
387            logging.exception('The following exception will be ignored and '
388                              'lock modification will be enforced. %s', e)
389
390        if host.shard:
391            affected_shard_hostnames.add(host.shard.rpc_hostname())
392            affected_host_ids.append(host.id)
393
394    # This is required to make `lock_time` for a host be exactly same
395    # between the master and a shard.
396    if update_data.get('locked', None) and 'lock_time' not in update_data:
397        update_data['lock_time'] = datetime.datetime.now()
398    for host in hosts:
399        host.update_object(update_data)
400
401    update_data.pop('force_modify_locking', None)
402    # Caution: Changing the filter from the original here. See docstring.
403    rpc_utils.run_rpc_on_multiple_hostnames(
404            'modify_hosts_local', affected_shard_hostnames,
405            host_filter_data={'id__in': affected_host_ids},
406            update_data=update_data)
407
408
409def modify_hosts_local(host_filter_data, update_data):
410    """Modify attributes of hosts in local DB.
411
412    @param host_filter_data: Filters out which hosts to modify.
413    @param update_data: A dictionary with the changes to make to the hosts.
414    """
415    for host in models.Host.query_objects(host_filter_data):
416        host.update_object(update_data)
417
418
419def add_labels_to_host(id, labels):
420    """Adds labels to a given host only in local DB.
421
422    @param id: id or hostname for a host.
423    @param labels: ids or names for labels.
424    """
425    label_objs = models.Label.smart_get_bulk(labels)
426    models.Host.smart_get(id).labels.add(*label_objs)
427
428
429@rpc_utils.route_rpc_to_master
430def host_add_labels(id, labels):
431    """Adds labels to a given host.
432
433    @param id: id or hostname for a host.
434    @param labels: ids or names for labels.
435
436    @raises ValidationError: If adding more than one platform/board label.
437    """
438    # Create the labels on the master/shards.
439    for label in labels:
440        _create_label_everywhere(label, [id])
441
442    label_objs = models.Label.smart_get_bulk(labels)
443    platforms = [label.name for label in label_objs if label.platform]
444    boards = [label.name for label in label_objs
445              if label.name.startswith('board:')]
446    if len(platforms) > 1 or not utils.board_labels_allowed(boards):
447        raise model_logic.ValidationError(
448            {'labels': ('Adding more than one platform label, or a list of '
449                        'non-compatible board labels.: %s %s' %
450                        (', '.join(platforms), ', '.join(boards)))})
451
452    host_obj = models.Host.smart_get(id)
453    if platforms:
454        models.Host.check_no_platform([host_obj])
455    if boards:
456        models.Host.check_board_labels_allowed([host_obj], labels)
457    add_labels_to_host(id, labels)
458
459    rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False,
460                         id=id, labels=labels)
461
462
463def remove_labels_from_host(id, labels):
464    """Removes labels from a given host only in local DB.
465
466    @param id: id or hostname for a host.
467    @param labels: ids or names for labels.
468    """
469    label_objs = models.Label.smart_get_bulk(labels)
470    models.Host.smart_get(id).labels.remove(*label_objs)
471
472
473@rpc_utils.route_rpc_to_master
474def host_remove_labels(id, labels):
475    """Removes labels from a given host.
476
477    @param id: id or hostname for a host.
478    @param labels: ids or names for labels.
479    """
480    remove_labels_from_host(id, labels)
481
482    host_obj = models.Host.smart_get(id)
483    rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False,
484                         id=id, labels=labels)
485
486
487def get_host_attribute(attribute, **host_filter_data):
488    """
489    @param attribute: string name of attribute
490    @param host_filter_data: filter data to apply to Hosts to choose hosts to
491                             act upon
492    """
493    hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data)
494    hosts = list(hosts)
495    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
496                                               'attribute_list')
497    host_attr_dicts = []
498    for host_obj in hosts:
499        for attr_obj in host_obj.attribute_list:
500            if attr_obj.attribute == attribute:
501                host_attr_dicts.append(attr_obj.get_object_dict())
502    return rpc_utils.prepare_for_serialization(host_attr_dicts)
503
504
505def set_host_attribute(attribute, value, **host_filter_data):
506    """
507    @param attribute: string name of attribute
508    @param value: string, or None to delete an attribute
509    @param host_filter_data: filter data to apply to Hosts to choose hosts to
510                             act upon
511    """
512    assert host_filter_data # disallow accidental actions on all hosts
513    hosts = models.Host.query_objects(host_filter_data)
514    models.AclGroup.check_for_acl_violation_hosts(hosts)
515    for host in hosts:
516        host.set_or_delete_attribute(attribute, value)
517
518    # Master forwards this RPC to shards.
519    if not utils.is_shard():
520        rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False,
521                attribute=attribute, value=value, **host_filter_data)
522
523
524@rpc_utils.forward_single_host_rpc_to_shard
525def delete_host(id):
526    models.Host.smart_get(id).delete()
527
528
529def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
530              exclude_atomic_group_hosts=False, valid_only=True,
531              include_current_job=False, **filter_data):
532    """Get a list of dictionaries which contains the information of hosts.
533
534    @param multiple_labels: match hosts in all of the labels given.  Should
535            be a list of label names.
536    @param exclude_only_if_needed_labels: Exclude hosts with at least one
537            "only_if_needed" label applied.
538    @param exclude_atomic_group_hosts: Exclude hosts that have one or more
539            atomic group labels associated with them.
540    @param include_current_job: Set to True to include ids of currently running
541            job and special task.
542    """
543    hosts = rpc_utils.get_host_query(multiple_labels,
544                                     exclude_only_if_needed_labels,
545                                     exclude_atomic_group_hosts,
546                                     valid_only, filter_data)
547    hosts = list(hosts)
548    models.Host.objects.populate_relationships(hosts, models.Label,
549                                               'label_list')
550    models.Host.objects.populate_relationships(hosts, models.AclGroup,
551                                               'acl_list')
552    models.Host.objects.populate_relationships(hosts, models.HostAttribute,
553                                               'attribute_list')
554    host_dicts = []
555    for host_obj in hosts:
556        host_dict = host_obj.get_object_dict()
557        host_dict['labels'] = [label.name for label in host_obj.label_list]
558        host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.
559                find_platform_and_atomic_group(host_obj))
560        host_dict['acls'] = [acl.name for acl in host_obj.acl_list]
561        host_dict['attributes'] = dict((attribute.attribute, attribute.value)
562                                       for attribute in host_obj.attribute_list)
563        if include_current_job:
564            host_dict['current_job'] = None
565            host_dict['current_special_task'] = None
566            entries = models.HostQueueEntry.objects.filter(
567                    host_id=host_dict['id'], active=True, complete=False)
568            if entries:
569                host_dict['current_job'] = (
570                        entries[0].get_object_dict()['job'])
571            tasks = models.SpecialTask.objects.filter(
572                    host_id=host_dict['id'], is_active=True, is_complete=False)
573            if tasks:
574                host_dict['current_special_task'] = (
575                        '%d-%s' % (tasks[0].get_object_dict()['id'],
576                                   tasks[0].get_object_dict()['task'].lower()))
577        host_dicts.append(host_dict)
578    return rpc_utils.prepare_for_serialization(host_dicts)
579
580
581def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,
582                  exclude_atomic_group_hosts=False, valid_only=True,
583                  **filter_data):
584    """
585    Same parameters as get_hosts().
586
587    @returns The number of matching hosts.
588    """
589    hosts = rpc_utils.get_host_query(multiple_labels,
590                                     exclude_only_if_needed_labels,
591                                     exclude_atomic_group_hosts,
592                                     valid_only, filter_data)
593    return hosts.count()
594
595
596# tests
597
598def add_test(name, test_type, path, author=None, dependencies=None,
599             experimental=True, run_verify=None, test_class=None,
600             test_time=None, test_category=None, description=None,
601             sync_count=1):
602    return models.Test.add_object(name=name, test_type=test_type, path=path,
603                                  author=author, dependencies=dependencies,
604                                  experimental=experimental,
605                                  run_verify=run_verify, test_time=test_time,
606                                  test_category=test_category,
607                                  sync_count=sync_count,
608                                  test_class=test_class,
609                                  description=description).id
610
611
612def modify_test(id, **data):
613    models.Test.smart_get(id).update_object(data)
614
615
616def delete_test(id):
617    models.Test.smart_get(id).delete()
618
619
620def get_tests(**filter_data):
621    return rpc_utils.prepare_for_serialization(
622        models.Test.list_objects(filter_data))
623
624
625@_timer.decorate
626def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
627    """Gets the counts of all passed and failed tests from the matching jobs.
628
629    @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
630            'butterfly-release/R40-6457.21.0/bvt-cq/'.
631    @param label_name: Label that must be set in the jobs, e.g.,
632            'cros-version:butterfly-release/R40-6457.21.0'.
633
634    @returns A summary of the counts of all the passed and failed tests.
635    """
636    job_ids = list(models.Job.objects.filter(
637            name__startswith=job_name_prefix,
638            dependency_labels__name=label_name).values_list(
639                'pk', flat=True))
640    summary = {'passed': 0, 'failed': 0}
641    if not job_ids:
642        return summary
643
644    counts = (tko_models.TestView.objects.filter(
645            afe_job_id__in=job_ids).exclude(
646                test_name='SERVER_JOB').exclude(
647                    test_name__startswith='CLIENT_JOB').values(
648                        'status').annotate(
649                            count=Count('status')))
650    for status in counts:
651        if status['status'] == 'GOOD':
652            summary['passed'] += status['count']
653        else:
654            summary['failed'] += status['count']
655    return summary
656
657
658# profilers
659
660def add_profiler(name, description=None):
661    return models.Profiler.add_object(name=name, description=description).id
662
663
664def modify_profiler(id, **data):
665    models.Profiler.smart_get(id).update_object(data)
666
667
668def delete_profiler(id):
669    models.Profiler.smart_get(id).delete()
670
671
672def get_profilers(**filter_data):
673    return rpc_utils.prepare_for_serialization(
674        models.Profiler.list_objects(filter_data))
675
676
677# users
678
679def add_user(login, access_level=None):
680    return models.User.add_object(login=login, access_level=access_level).id
681
682
683def modify_user(id, **data):
684    models.User.smart_get(id).update_object(data)
685
686
687def delete_user(id):
688    models.User.smart_get(id).delete()
689
690
691def get_users(**filter_data):
692    return rpc_utils.prepare_for_serialization(
693        models.User.list_objects(filter_data))
694
695
696# acl groups
697
698def add_acl_group(name, description=None):
699    group = models.AclGroup.add_object(name=name, description=description)
700    group.users.add(models.User.current_user())
701    return group.id
702
703
704def modify_acl_group(id, **data):
705    group = models.AclGroup.smart_get(id)
706    group.check_for_acl_violation_acl_group()
707    group.update_object(data)
708    group.add_current_user_if_empty()
709
710
711def acl_group_add_users(id, users):
712    group = models.AclGroup.smart_get(id)
713    group.check_for_acl_violation_acl_group()
714    users = models.User.smart_get_bulk(users)
715    group.users.add(*users)
716
717
718def acl_group_remove_users(id, users):
719    group = models.AclGroup.smart_get(id)
720    group.check_for_acl_violation_acl_group()
721    users = models.User.smart_get_bulk(users)
722    group.users.remove(*users)
723    group.add_current_user_if_empty()
724
725
726def acl_group_add_hosts(id, hosts):
727    group = models.AclGroup.smart_get(id)
728    group.check_for_acl_violation_acl_group()
729    hosts = models.Host.smart_get_bulk(hosts)
730    group.hosts.add(*hosts)
731    group.on_host_membership_change()
732
733
734def acl_group_remove_hosts(id, hosts):
735    group = models.AclGroup.smart_get(id)
736    group.check_for_acl_violation_acl_group()
737    hosts = models.Host.smart_get_bulk(hosts)
738    group.hosts.remove(*hosts)
739    group.on_host_membership_change()
740
741
742def delete_acl_group(id):
743    models.AclGroup.smart_get(id).delete()
744
745
746def get_acl_groups(**filter_data):
747    acl_groups = models.AclGroup.list_objects(filter_data)
748    for acl_group in acl_groups:
749        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])
750        acl_group['users'] = [user.login
751                              for user in acl_group_obj.users.all()]
752        acl_group['hosts'] = [host.hostname
753                              for host in acl_group_obj.hosts.all()]
754    return rpc_utils.prepare_for_serialization(acl_groups)
755
756
757# jobs
758
759def generate_control_file(tests=(), profilers=(),
760                          client_control_file='', use_container=False,
761                          profile_only=None, db_tests=True,
762                          test_source_build=None):
763    """
764    Generates a client-side control file to run tests.
765
766    @param tests List of tests to run. See db_tests for more information.
767    @param profilers List of profilers to activate during the job.
768    @param client_control_file The contents of a client-side control file to
769        run at the end of all tests.  If this is supplied, all tests must be
770        client side.
771        TODO: in the future we should support server control files directly
772        to wrap with a kernel.  That'll require changing the parameter
773        name and adding a boolean to indicate if it is a client or server
774        control file.
775    @param use_container unused argument today.  TODO: Enable containers
776        on the host during a client side test.
777    @param profile_only A boolean that indicates what default profile_only
778        mode to use in the control file. Passing None will generate a
779        control file that does not explcitly set the default mode at all.
780    @param db_tests: if True, the test object can be found in the database
781                     backing the test model. In this case, tests is a tuple
782                     of test IDs which are used to retrieve the test objects
783                     from the database. If False, tests is a tuple of test
784                     dictionaries stored client-side in the AFE.
785    @param test_source_build: Build to be used to retrieve test code. Default
786                              to None.
787
788    @returns a dict with the following keys:
789        control_file: str, The control file text.
790        is_server: bool, is the control file a server-side control file?
791        synch_count: How many machines the job uses per autoserv execution.
792            synch_count == 1 means the job is asynchronous.
793        dependencies: A list of the names of labels on which the job depends.
794    """
795    if not tests and not client_control_file:
796        return dict(control_file='', is_server=False, synch_count=1,
797                    dependencies=[])
798
799    cf_info, test_objects, profiler_objects = (
800        rpc_utils.prepare_generate_control_file(tests, profilers,
801                                                db_tests))
802    cf_info['control_file'] = control_file.generate_control(
803        tests=test_objects, profilers=profiler_objects,
804        is_server=cf_info['is_server'],
805        client_control_file=client_control_file, profile_only=profile_only,
806        test_source_build=test_source_build)
807    return cf_info
808
809
810def create_parameterized_job(name, priority, test, parameters, kernel=None,
811                             label=None, profilers=(), profiler_parameters=None,
812                             use_container=False, profile_only=None,
813                             upload_kernel_config=False, hosts=(),
814                             meta_hosts=(), one_time_hosts=(),
815                             atomic_group_name=None, synch_count=None,
816                             is_template=False, timeout=None,
817                             timeout_mins=None, max_runtime_mins=None,
818                             run_verify=False, email_list='', dependencies=(),
819                             reboot_before=None, reboot_after=None,
820                             parse_failed_repair=None, hostless=False,
821                             keyvals=None, drone_set=None, run_reset=True,
822                             require_ssp=None):
823    """
824    Creates and enqueues a parameterized job.
825
826    Most parameters a combination of the parameters for generate_control_file()
827    and create_job(), with the exception of:
828
829    @param test name or ID of the test to run
830    @param parameters a map of parameter name ->
831                          tuple of (param value, param type)
832    @param profiler_parameters a dictionary of parameters for the profilers:
833                                   key: profiler name
834                                   value: dict of param name -> tuple of
835                                                                (param value,
836                                                                 param type)
837    """
838    # Save the values of the passed arguments here. What we're going to do with
839    # them is pass them all to rpc_utils.get_create_job_common_args(), which
840    # will extract the subset of these arguments that apply for
841    # rpc_utils.create_job_common(), which we then pass in to that function.
842    args = locals()
843
844    # Set up the parameterized job configs
845    test_obj = models.Test.smart_get(test)
846    control_type = test_obj.test_type
847
848    try:
849        label = models.Label.smart_get(label)
850    except models.Label.DoesNotExist:
851        label = None
852
853    kernel_objs = models.Kernel.create_kernels(kernel)
854    profiler_objs = [models.Profiler.smart_get(profiler)
855                     for profiler in profilers]
856
857    parameterized_job = models.ParameterizedJob.objects.create(
858            test=test_obj, label=label, use_container=use_container,
859            profile_only=profile_only,
860            upload_kernel_config=upload_kernel_config)
861    parameterized_job.kernels.add(*kernel_objs)
862
863    for profiler in profiler_objs:
864        parameterized_profiler = models.ParameterizedJobProfiler.objects.create(
865                parameterized_job=parameterized_job,
866                profiler=profiler)
867        profiler_params = profiler_parameters.get(profiler.name, {})
868        for name, (value, param_type) in profiler_params.iteritems():
869            models.ParameterizedJobProfilerParameter.objects.create(
870                    parameterized_job_profiler=parameterized_profiler,
871                    parameter_name=name,
872                    parameter_value=value,
873                    parameter_type=param_type)
874
875    try:
876        for parameter in test_obj.testparameter_set.all():
877            if parameter.name in parameters:
878                param_value, param_type = parameters.pop(parameter.name)
879                parameterized_job.parameterizedjobparameter_set.create(
880                        test_parameter=parameter, parameter_value=param_value,
881                        parameter_type=param_type)
882
883        if parameters:
884            raise Exception('Extra parameters remain: %r' % parameters)
885
886        return rpc_utils.create_job_common(
887                parameterized_job=parameterized_job.id,
888                control_type=control_type,
889                **rpc_utils.get_create_job_common_args(args))
890    except:
891        parameterized_job.delete()
892        raise
893
894
895def create_job_page_handler(name, priority, control_file, control_type,
896                            image=None, hostless=False, firmware_rw_build=None,
897                            firmware_ro_build=None, test_source_build=None,
898                            is_cloning=False, **kwargs):
899    """\
900    Create and enqueue a job.
901
902    @param name name of this job
903    @param priority Integer priority of this job.  Higher is more important.
904    @param control_file String contents of the control file.
905    @param control_type Type of control file, Client or Server.
906    @param image: ChromeOS build to be installed in the dut. Default to None.
907    @param firmware_rw_build: Firmware build to update RW firmware. Default to
908                              None, i.e., RW firmware will not be updated.
909    @param firmware_ro_build: Firmware build to update RO firmware. Default to
910                              None, i.e., RO firmware will not be updated.
911    @param test_source_build: Build to be used to retrieve test code. Default
912                              to None.
913    @param is_cloning: True if creating a cloning job.
914    @param kwargs extra args that will be required by create_suite_job or
915                  create_job.
916
917    @returns The created Job id number.
918    """
919    if is_cloning:
920        logging.info('Start to clone a new job')
921        # When cloning a job, hosts and meta_hosts should not exist together,
922        # which would cause host-scheduler to schedule two hqe jobs to one host
923        # at the same time, and crash itself. Clear meta_hosts for this case.
924        if kwargs.get('hosts') and kwargs.get('meta_hosts'):
925            kwargs['meta_hosts'] = []
926    else:
927        logging.info('Start to create a new job')
928    control_file = rpc_utils.encode_ascii(control_file)
929    if not control_file:
930        raise model_logic.ValidationError({
931                'control_file' : "Control file cannot be empty"})
932
933    if image and hostless:
934        builds = {}
935        builds[provision.CROS_VERSION_PREFIX] = image
936        if firmware_rw_build:
937            builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
938        if firmware_ro_build:
939            builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
940        return site_rpc_interface.create_suite_job(
941                name=name, control_file=control_file, priority=priority,
942                builds=builds, test_source_build=test_source_build,
943                is_cloning=is_cloning, **kwargs)
944    return create_job(name, priority, control_file, control_type, image=image,
945                      hostless=hostless, is_cloning=is_cloning, **kwargs)
946
947
948@rpc_utils.route_rpc_to_master
949def create_job(name, priority, control_file, control_type,
950               hosts=(), meta_hosts=(), one_time_hosts=(),
951               atomic_group_name=None, synch_count=None, is_template=False,
952               timeout=None, timeout_mins=None, max_runtime_mins=None,
953               run_verify=False, email_list='', dependencies=(),
954               reboot_before=None, reboot_after=None, parse_failed_repair=None,
955               hostless=False, keyvals=None, drone_set=None, image=None,
956               parent_job_id=None, test_retry=0, run_reset=True,
957               require_ssp=None, args=(), is_cloning=False, **kwargs):
958    """\
959    Create and enqueue a job.
960
961    @param name name of this job
962    @param priority Integer priority of this job.  Higher is more important.
963    @param control_file String contents of the control file.
964    @param control_type Type of control file, Client or Server.
965    @param synch_count How many machines the job uses per autoserv execution.
966        synch_count == 1 means the job is asynchronous.  If an atomic group is
967        given this value is treated as a minimum.
968    @param is_template If true then create a template job.
969    @param timeout Hours after this call returns until the job times out.
970    @param timeout_mins Minutes after this call returns until the job times
971        out.
972    @param max_runtime_mins Minutes from job starting time until job times out
973    @param run_verify Should the host be verified before running the test?
974    @param email_list String containing emails to mail when the job is done
975    @param dependencies List of label names on which this job depends
976    @param reboot_before Never, If dirty, or Always
977    @param reboot_after Never, If all tests passed, or Always
978    @param parse_failed_repair if true, results of failed repairs launched by
979        this job will be parsed as part of the job.
980    @param hostless if true, create a hostless job
981    @param keyvals dict of keyvals to associate with the job
982    @param hosts List of hosts to run job on.
983    @param meta_hosts List where each entry is a label name, and for each entry
984        one host will be chosen from that label to run the job on.
985    @param one_time_hosts List of hosts not in the database to run the job on.
986    @param atomic_group_name The name of an atomic group to schedule the job on.
987    @param drone_set The name of the drone set to run this test on.
988    @param image OS image to install before running job.
989    @param parent_job_id id of a job considered to be parent of created job.
990    @param test_retry Number of times to retry test if the test did not
991        complete successfully. (optional, default: 0)
992    @param run_reset Should the host be reset before running the test?
993    @param require_ssp Set to True to require server-side packaging to run the
994                       test. If it's set to None, drone will still try to run
995                       the server side with server-side packaging. If the
996                       autotest-server package doesn't exist for the build or
997                       image is not set, drone will run the test without server-
998                       side packaging. Default is None.
999    @param args A list of args to be injected into control file.
1000    @param is_cloning: True if creating a cloning job.
1001    @param kwargs extra keyword args. NOT USED.
1002
1003    @returns The created Job id number.
1004    """
1005    if args:
1006        control_file = tools.inject_vars({'args': args}, control_file)
1007
1008    if image is None:
1009        return rpc_utils.create_job_common(
1010                **rpc_utils.get_create_job_common_args(locals()))
1011
1012    # Translate the image name, in case its a relative build name.
1013    ds = dev_server.ImageServer.resolve(image)
1014    image = ds.translate(image)
1015
1016    # When image is supplied use a known parameterized test already in the
1017    # database to pass the OS image path from the front end, through the
1018    # scheduler, and finally to autoserv as the --image parameter.
1019
1020    # The test autoupdate_ParameterizedJob is in afe_autotests and used to
1021    # instantiate a Test object and from there a ParameterizedJob.
1022    known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
1023    known_parameterized_job = models.ParameterizedJob.objects.create(
1024            test=known_test_obj)
1025
1026    # autoupdate_ParameterizedJob has a single parameter, the image parameter,
1027    # stored in the table afe_test_parameters.  We retrieve and set this
1028    # instance of the parameter to the OS image path.
1029    image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj,
1030                                                           name='image')
1031    known_parameterized_job.parameterizedjobparameter_set.create(
1032            test_parameter=image_parameter, parameter_value=image,
1033            parameter_type='string')
1034
1035    # TODO(crbug.com/502638): save firmware build etc to parameterized_job.
1036
1037    # By passing a parameterized_job to create_job_common the job entry in
1038    # the afe_jobs table will have the field parameterized_job_id set.
1039    # The scheduler uses this id in the afe_parameterized_jobs table to
1040    # match this job to our known test, and then with the
1041    # afe_parameterized_job_parameters table to get the actual image path.
1042    return rpc_utils.create_job_common(
1043            parameterized_job=known_parameterized_job.id,
1044            **rpc_utils.get_create_job_common_args(locals()))
1045
1046
1047def abort_host_queue_entries(**filter_data):
1048    """\
1049    Abort a set of host queue entries.
1050
1051    @return: A list of dictionaries, each contains information
1052             about an aborted HQE.
1053    """
1054    query = models.HostQueueEntry.query_objects(filter_data)
1055
1056    # Dont allow aborts on:
1057    #   1. Jobs that have already completed (whether or not they were aborted)
1058    #   2. Jobs that we have already been aborted (but may not have completed)
1059    query = query.filter(complete=False).filter(aborted=False)
1060    models.AclGroup.check_abort_permissions(query)
1061    host_queue_entries = list(query.select_related())
1062    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)
1063
1064    models.HostQueueEntry.abort_host_queue_entries(host_queue_entries)
1065    hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id,
1066                 'Job name': hqe.job.name} for hqe in host_queue_entries]
1067    return hqe_info
1068
1069
1070def abort_special_tasks(**filter_data):
1071    """\
1072    Abort the special task, or tasks, specified in the filter.
1073    """
1074    query = models.SpecialTask.query_objects(filter_data)
1075    special_tasks = query.filter(is_active=True)
1076    for task in special_tasks:
1077        task.abort()
1078
1079
1080def _call_special_tasks_on_hosts(task, hosts):
1081    """\
1082    Schedules a set of hosts for a special task.
1083
1084    @returns A list of hostnames that a special task was created for.
1085    """
1086    models.AclGroup.check_for_acl_violation_hosts(hosts)
1087    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
1088    if shard_host_map and not utils.is_shard():
1089        raise ValueError('The following hosts are on shards, please '
1090                         'follow the link to the shards and create jobs '
1091                         'there instead. %s.' % shard_host_map)
1092    for host in hosts:
1093        models.SpecialTask.schedule_special_task(host, task)
1094    return list(sorted(host.hostname for host in hosts))
1095
1096
1097def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
1098    """Forward special tasks to corresponding shards.
1099
1100    For master, when special tasks are fired on hosts that are sharded,
1101    forward the RPC to corresponding shards.
1102
1103    For shard, create special task records in local DB.
1104
1105    @param task: Enum value of frontend.afe.models.SpecialTask.Task
1106    @param rpc: RPC name to forward.
1107    @param filter_data: Filter keywords to be used for DB query.
1108
1109    @return: A list of hostnames that a special task was created for.
1110    """
1111    hosts = models.Host.query_objects(filter_data)
1112    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
1113
1114    # Filter out hosts on a shard from those on the master, forward
1115    # rpcs to the shard with an additional hostname__in filter, and
1116    # create a local SpecialTask for each remaining host.
1117    if shard_host_map and not utils.is_shard():
1118        hosts = [h for h in hosts if h.shard is None]
1119        for shard, hostnames in shard_host_map.iteritems():
1120
1121            # The main client of this module is the frontend website, and
1122            # it invokes it with an 'id' or an 'id__in' filter. Regardless,
1123            # the 'hostname' filter should narrow down the list of hosts on
1124            # each shard even though we supply all the ids in filter_data.
1125            # This method uses hostname instead of id because it fits better
1126            # with the overall architecture of redirection functions in
1127            # rpc_utils.
1128            shard_filter = filter_data.copy()
1129            shard_filter['hostname__in'] = hostnames
1130            rpc_utils.run_rpc_on_multiple_hostnames(
1131                    rpc, [shard], **shard_filter)
1132
1133    # There is a race condition here if someone assigns a shard to one of these
1134    # hosts before we create the task. The host will stay on the master if:
1135    # 1. The host is not Ready
1136    # 2. The host is Ready but has a task
1137    # But if the host is Ready and doesn't have a task yet, it will get sent
1138    # to the shard as we're creating a task here.
1139
1140    # Given that we only rarely verify Ready hosts it isn't worth putting this
1141    # entire method in a transaction. The worst case scenario is that we have
1142    # a verify running on a Ready host while the shard is using it, if the
1143    # verify fails no subsequent tasks will be created against the host on the
1144    # master, and verifies are safe enough that this is OK.
1145    return _call_special_tasks_on_hosts(task, hosts)
1146
1147
1148def reverify_hosts(**filter_data):
1149    """\
1150    Schedules a set of hosts for verify.
1151
1152    @returns A list of hostnames that a verify task was created for.
1153    """
1154    return _forward_special_tasks_on_hosts(
1155            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
1156
1157
1158def repair_hosts(**filter_data):
1159    """\
1160    Schedules a set of hosts for repair.
1161
1162    @returns A list of hostnames that a repair task was created for.
1163    """
1164    return _forward_special_tasks_on_hosts(
1165            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
1166
1167
1168def get_jobs(not_yet_run=False, running=False, finished=False,
1169             suite=False, sub=False, standalone=False, **filter_data):
1170    """\
1171    Extra status filter args for get_jobs:
1172    -not_yet_run: Include only jobs that have not yet started running.
1173    -running: Include only jobs that have start running but for which not
1174    all hosts have completed.
1175    -finished: Include only jobs for which all hosts have completed (or
1176    aborted).
1177
1178    Extra type filter args for get_jobs:
1179    -suite: Include only jobs with child jobs.
1180    -sub: Include only jobs with a parent job.
1181    -standalone: Inlcude only jobs with no child or parent jobs.
1182    At most one of these three fields should be specified.
1183    """
1184    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1185                                                    running,
1186                                                    finished)
1187    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1188                                                                 suite,
1189                                                                 sub,
1190                                                                 standalone)
1191    job_dicts = []
1192    jobs = list(models.Job.query_objects(filter_data))
1193    models.Job.objects.populate_relationships(jobs, models.Label,
1194                                              'dependencies')
1195    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
1196    for job in jobs:
1197        job_dict = job.get_object_dict()
1198        job_dict['dependencies'] = ','.join(label.name
1199                                            for label in job.dependencies)
1200        job_dict['keyvals'] = dict((keyval.key, keyval.value)
1201                                   for keyval in job.keyvals)
1202        if job.parameterized_job:
1203            job_dict['image'] = get_parameterized_autoupdate_image_url(job)
1204        job_dicts.append(job_dict)
1205    return rpc_utils.prepare_for_serialization(job_dicts)
1206
1207
1208def get_num_jobs(not_yet_run=False, running=False, finished=False,
1209                 suite=False, sub=False, standalone=False,
1210                 **filter_data):
1211    """\
1212    See get_jobs() for documentation of extra filter parameters.
1213    """
1214    extra_args = rpc_utils.extra_job_status_filters(not_yet_run,
1215                                                    running,
1216                                                    finished)
1217    filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args,
1218                                                                 suite,
1219                                                                 sub,
1220                                                                 standalone)
1221    return models.Job.query_count(filter_data)
1222
1223
1224def get_jobs_summary(**filter_data):
1225    """\
1226    Like get_jobs(), but adds 'status_counts' and 'result_counts' field.
1227
1228    'status_counts' filed is a dictionary mapping status strings to the number
1229    of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}.
1230
1231    'result_counts' field is piped to tko's rpc_interface and has the return
1232    format specified under get_group_counts.
1233    """
1234    jobs = get_jobs(**filter_data)
1235    ids = [job['id'] for job in jobs]
1236    all_status_counts = models.Job.objects.get_status_counts(ids)
1237    for job in jobs:
1238        job['status_counts'] = all_status_counts[job['id']]
1239        job['result_counts'] = tko_rpc_interface.get_status_counts(
1240                ['afe_job_id', 'afe_job_id'],
1241                header_groups=[['afe_job_id'], ['afe_job_id']],
1242                **{'afe_job_id': job['id']})
1243    return rpc_utils.prepare_for_serialization(jobs)
1244
1245
1246def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):
1247    """\
1248    Retrieves all the information needed to clone a job.
1249    """
1250    job = models.Job.objects.get(id=id)
1251    job_info = rpc_utils.get_job_info(job,
1252                                      preserve_metahosts,
1253                                      queue_entry_filter_data)
1254
1255    host_dicts = []
1256    for host in job_info['hosts']:
1257        host_dict = get_hosts(id=host.id)[0]
1258        other_labels = host_dict['labels']
1259        if host_dict['platform']:
1260            other_labels.remove(host_dict['platform'])
1261        host_dict['other_labels'] = ', '.join(other_labels)
1262        host_dicts.append(host_dict)
1263
1264    for host in job_info['one_time_hosts']:
1265        host_dict = dict(hostname=host.hostname,
1266                         id=host.id,
1267                         platform='(one-time host)',
1268                         locked_text='')
1269        host_dicts.append(host_dict)
1270
1271    # convert keys from Label objects to strings (names of labels)
1272    meta_host_counts = dict((meta_host.name, count) for meta_host, count
1273                            in job_info['meta_host_counts'].iteritems())
1274
1275    info = dict(job=job.get_object_dict(),
1276                meta_host_counts=meta_host_counts,
1277                hosts=host_dicts)
1278    info['job']['dependencies'] = job_info['dependencies']
1279    if job_info['atomic_group']:
1280        info['atomic_group_name'] = (job_info['atomic_group']).name
1281    else:
1282        info['atomic_group_name'] = None
1283    info['hostless'] = job_info['hostless']
1284    info['drone_set'] = job.drone_set and job.drone_set.name
1285
1286    image = _get_image_for_job(job, job_info['hostless'])
1287    if image:
1288        info['job']['image'] = image
1289
1290    return rpc_utils.prepare_for_serialization(info)
1291
1292
1293def _get_image_for_job(job, hostless):
1294    """ Gets the image used for a job.
1295
1296    Gets the image used for an AFE job. If the job is a parameterized job, get
1297    the image from the job parameter; otherwise, tries to get the image from
1298    the job's keyvals 'build' or 'builds'. As a last resort, if the job is a
1299    hostless job, tries to get the image from its control file attributes
1300    'build' or 'builds'.
1301
1302    TODO(ntang): Needs to handle FAFT with two builds for ro/rw.
1303
1304    @param job      An AFE job object.
1305    @param hostless Boolean on of the job is hostless.
1306
1307    @returns The image build used for the job.
1308    """
1309    image = None
1310    if job.parameterized_job:
1311        image = get_parameterized_autoupdate_image_url(job)
1312    else:
1313        keyvals = job.keyval_dict()
1314        image = keyvals.get('build')
1315        if not image:
1316            value = keyvals.get('builds')
1317            builds = None
1318            if isinstance(value, dict):
1319                builds = value
1320            elif isinstance(value, basestring):
1321                builds = ast.literal_eval(value)
1322            if builds:
1323                image = builds.get('cros-version')
1324        if not image and hostless and job.control_file:
1325            try:
1326                control_obj = control_data.parse_control_string(
1327                        job.control_file)
1328                if hasattr(control_obj, 'build'):
1329                    image = getattr(control_obj, 'build')
1330                if not image and hasattr(control_obj, 'builds'):
1331                    builds = getattr(control_obj, 'builds')
1332                    image = builds.get('cros-version')
1333            except:
1334                logging.warning('Failed to parse control file for job: %s',
1335                                job.name)
1336    return image
1337
1338
1339def get_host_queue_entries(start_time=None, end_time=None, **filter_data):
1340    """\
1341    @returns A sequence of nested dictionaries of host and job information.
1342    """
1343    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1344                                                   'started_on__lte',
1345                                                   start_time,
1346                                                   end_time,
1347                                                   **filter_data)
1348    return rpc_utils.prepare_rows_as_nested_dicts(
1349            models.HostQueueEntry.query_objects(filter_data),
1350            ('host', 'atomic_group', 'job'))
1351
1352
1353def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
1354    """\
1355    Get the number of host queue entries associated with this job.
1356    """
1357    filter_data = rpc_utils.inject_times_to_filter('started_on__gte',
1358                                                   'started_on__lte',
1359                                                   start_time,
1360                                                   end_time,
1361                                                   **filter_data)
1362    return models.HostQueueEntry.query_count(filter_data)
1363
1364
1365def get_hqe_percentage_complete(**filter_data):
1366    """
1367    Computes the fraction of host queue entries matching the given filter data
1368    that are complete.
1369    """
1370    query = models.HostQueueEntry.query_objects(filter_data)
1371    complete_count = query.filter(complete=True).count()
1372    total_count = query.count()
1373    if total_count == 0:
1374        return 1
1375    return float(complete_count) / total_count
1376
1377
1378# special tasks
1379
1380def get_special_tasks(**filter_data):
1381    """Get special task entries from the local database.
1382
1383    Query the special tasks table for tasks matching the given
1384    `filter_data`, and return a list of the results.  No attempt is
1385    made to forward the call to shards; the buck will stop here.
1386    The caller is expected to know the target shard for such reasons
1387    as:
1388      * The caller is a service (such as gs_offloader) configured
1389        to operate on behalf of one specific shard, and no other.
1390      * The caller has a host as a parameter, and knows that this is
1391        the shard assigned to that host.
1392
1393    @param filter_data  Filter keywords to pass to the underlying
1394                        database query.
1395
1396    """
1397    return rpc_utils.prepare_rows_as_nested_dicts(
1398            models.SpecialTask.query_objects(filter_data),
1399            ('host', 'queue_entry'))
1400
1401
1402def get_host_special_tasks(host_id, **filter_data):
1403    """Get special task entries for a given host.
1404
1405    Query the special tasks table for tasks that ran on the host
1406    given by `host_id` and matching the given `filter_data`.
1407    Return a list of the results.  If the host is assigned to a
1408    shard, forward this call to that shard.
1409
1410    @param host_id      Id in the database of the target host.
1411    @param filter_data  Filter keywords to pass to the underlying
1412                        database query.
1413
1414    """
1415    # Retrieve host data even if the host is in an invalid state.
1416    host = models.Host.smart_get(host_id, False)
1417    if not host.shard:
1418        return get_special_tasks(host_id=host_id, **filter_data)
1419    else:
1420        # The return values from AFE methods are post-processed
1421        # objects that aren't JSON-serializable.  So, we have to
1422        # call AFE.run() to get the raw, serializable output from
1423        # the shard.
1424        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1425        return shard_afe.run('get_special_tasks',
1426                             host_id=host_id, **filter_data)
1427
1428
1429def get_num_special_tasks(**kwargs):
1430    """Get the number of special task entries from the local database.
1431
1432    Query the special tasks table for tasks matching the given 'kwargs',
1433    and return the number of the results. No attempt is made to forward
1434    the call to shards; the buck will stop here.
1435
1436    @param kwargs    Filter keywords to pass to the underlying database query.
1437
1438    """
1439    return models.SpecialTask.query_count(kwargs)
1440
1441
1442def get_host_num_special_tasks(host, **kwargs):
1443    """Get special task entries for a given host.
1444
1445    Query the special tasks table for tasks that ran on the host
1446    given by 'host' and matching the given 'kwargs'.
1447    Return a list of the results.  If the host is assigned to a
1448    shard, forward this call to that shard.
1449
1450    @param host      id or name of a host. More often a hostname.
1451    @param kwargs    Filter keywords to pass to the underlying database query.
1452
1453    """
1454    # Retrieve host data even if the host is in an invalid state.
1455    host_model = models.Host.smart_get(host, False)
1456    if not host_model.shard:
1457        return get_num_special_tasks(host=host, **kwargs)
1458    else:
1459        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
1460        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
1461
1462
1463def get_status_task(host_id, end_time):
1464    """Get the "status task" for a host from the local shard.
1465
1466    Returns a single special task representing the given host's
1467    "status task".  The status task is a completed special task that
1468    identifies whether the corresponding host was working or broken
1469    when it completed.  A successful task indicates a working host;
1470    a failed task indicates broken.
1471
1472    This call will not be forward to a shard; the receiving server
1473    must be the shard that owns the host.
1474
1475    @param host_id      Id in the database of the target host.
1476    @param end_time     Time reference for the host's status.
1477
1478    @return A single task; its status (successful or not)
1479            corresponds to the status of the host (working or
1480            broken) at the given time.  If no task is found, return
1481            `None`.
1482
1483    """
1484    tasklist = rpc_utils.prepare_rows_as_nested_dicts(
1485            status_history.get_status_task(host_id, end_time),
1486            ('host', 'queue_entry'))
1487    return tasklist[0] if tasklist else None
1488
1489
1490def get_host_status_task(host_id, end_time):
1491    """Get the "status task" for a host from its owning shard.
1492
1493    Finds the given host's owning shard, and forwards to it a call
1494    to `get_status_task()` (see above).
1495
1496    @param host_id      Id in the database of the target host.
1497    @param end_time     Time reference for the host's status.
1498
1499    @return A single task; its status (successful or not)
1500            corresponds to the status of the host (working or
1501            broken) at the given time.  If no task is found, return
1502            `None`.
1503
1504    """
1505    host = models.Host.smart_get(host_id)
1506    if not host.shard:
1507        return get_status_task(host_id, end_time)
1508    else:
1509        # The return values from AFE methods are post-processed
1510        # objects that aren't JSON-serializable.  So, we have to
1511        # call AFE.run() to get the raw, serializable output from
1512        # the shard.
1513        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1514        return shard_afe.run('get_status_task',
1515                             host_id=host_id, end_time=end_time)
1516
1517
1518def get_host_diagnosis_interval(host_id, end_time, success):
1519    """Find a "diagnosis interval" for a given host.
1520
1521    A "diagnosis interval" identifies a start and end time where
1522    the host went from "working" to "broken", or vice versa.  The
1523    interval's starting time is the starting time of the last status
1524    task with the old status; the end time is the finish time of the
1525    first status task with the new status.
1526
1527    This routine finds the most recent diagnosis interval for the
1528    given host prior to `end_time`, with a starting status matching
1529    `success`.  If `success` is true, the interval will start with a
1530    successful status task; if false the interval will start with a
1531    failed status task.
1532
1533    @param host_id      Id in the database of the target host.
1534    @param end_time     Time reference for the diagnosis interval.
1535    @param success      Whether the diagnosis interval should start
1536                        with a successful or failed status task.
1537
1538    @return A list of two strings.  The first is the timestamp for
1539            the beginning of the interval; the second is the
1540            timestamp for the end.  If the host has never changed
1541            state, the list is empty.
1542
1543    """
1544    host = models.Host.smart_get(host_id)
1545    if not host.shard or utils.is_shard():
1546        return status_history.get_diagnosis_interval(
1547                host_id, end_time, success)
1548    else:
1549        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
1550        return shard_afe.get_host_diagnosis_interval(
1551                host_id, end_time, success)
1552
1553
1554# support for host detail view
1555
1556def get_host_queue_entries_and_special_tasks(host, query_start=None,
1557                                             query_limit=None, start_time=None,
1558                                             end_time=None):
1559    """
1560    @returns an interleaved list of HostQueueEntries and SpecialTasks,
1561            in approximate run order.  each dict contains keys for type, host,
1562            job, status, started_on, execution_path, and ID.
1563    """
1564    total_limit = None
1565    if query_limit is not None:
1566        total_limit = query_start + query_limit
1567    filter_data_common = {'host': host,
1568                          'query_limit': total_limit,
1569                          'sort_by': ['-id']}
1570
1571    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
1572            'time_started__gte', 'time_started__lte', start_time, end_time,
1573            **filter_data_common)
1574
1575    queue_entries = get_host_queue_entries(
1576            start_time, end_time, **filter_data_common)
1577    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
1578
1579    interleaved_entries = rpc_utils.interleave_entries(queue_entries,
1580                                                       special_tasks)
1581    if query_start is not None:
1582        interleaved_entries = interleaved_entries[query_start:]
1583    if query_limit is not None:
1584        interleaved_entries = interleaved_entries[:query_limit]
1585    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
1586            interleaved_entries, queue_entries)
1587
1588
1589def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
1590                                                 end_time=None):
1591    filter_data_common = {'host': host}
1592
1593    filter_data_queue_entries, filter_data_special_tasks = (
1594            rpc_utils.inject_times_to_hqe_special_tasks_filters(
1595                    filter_data_common, start_time, end_time))
1596
1597    return (models.HostQueueEntry.query_count(filter_data_queue_entries)
1598            + get_host_num_special_tasks(**filter_data_special_tasks))
1599
1600
1601# recurring run
1602
1603def get_recurring(**filter_data):
1604    return rpc_utils.prepare_rows_as_nested_dicts(
1605            models.RecurringRun.query_objects(filter_data),
1606            ('job', 'owner'))
1607
1608
1609def get_num_recurring(**filter_data):
1610    return models.RecurringRun.query_count(filter_data)
1611
1612
1613def delete_recurring_runs(**filter_data):
1614    to_delete = models.RecurringRun.query_objects(filter_data)
1615    to_delete.delete()
1616
1617
1618def create_recurring_run(job_id, start_date, loop_period, loop_count):
1619    owner = models.User.current_user().login
1620    job = models.Job.objects.get(id=job_id)
1621    return job.create_recurring_job(start_date=start_date,
1622                                    loop_period=loop_period,
1623                                    loop_count=loop_count,
1624                                    owner=owner)
1625
1626
1627# other
1628
1629def echo(data=""):
1630    """\
1631    Returns a passed in string. For doing a basic test to see if RPC calls
1632    can successfully be made.
1633    """
1634    return data
1635
1636
1637def get_motd():
1638    """\
1639    Returns the message of the day as a string.
1640    """
1641    return rpc_utils.get_motd()
1642
1643
1644def get_static_data():
1645    """\
1646    Returns a dictionary containing a bunch of data that shouldn't change
1647    often and is otherwise inaccessible.  This includes:
1648
1649    priorities: List of job priority choices.
1650    default_priority: Default priority value for new jobs.
1651    users: Sorted list of all users.
1652    labels: Sorted list of labels not start with 'cros-version' and
1653            'fw-version'.
1654    atomic_groups: Sorted list of all atomic groups.
1655    tests: Sorted list of all tests.
1656    profilers: Sorted list of all profilers.
1657    current_user: Logged-in username.
1658    host_statuses: Sorted list of possible Host statuses.
1659    job_statuses: Sorted list of possible HostQueueEntry statuses.
1660    job_timeout_default: The default job timeout length in minutes.
1661    parse_failed_repair_default: Default value for the parse_failed_repair job
1662            option.
1663    reboot_before_options: A list of valid RebootBefore string enums.
1664    reboot_after_options: A list of valid RebootAfter string enums.
1665    motd: Server's message of the day.
1666    status_dictionary: A mapping from one word job status names to a more
1667            informative description.
1668    """
1669
1670    job_fields = models.Job.get_field_dict()
1671    default_drone_set_name = models.DroneSet.default_drone_set_name()
1672    drone_sets = ([default_drone_set_name] +
1673                  sorted(drone_set.name for drone_set in
1674                         models.DroneSet.objects.exclude(
1675                                 name=default_drone_set_name)))
1676
1677    result = {}
1678    result['priorities'] = priorities.Priority.choices()
1679    default_priority = priorities.Priority.DEFAULT
1680    result['default_priority'] = 'Default'
1681    result['max_schedulable_priority'] = priorities.Priority.DEFAULT
1682    result['users'] = get_users(sort_by=['login'])
1683
1684    label_exclude_filters = [{'name__startswith': 'cros-version'},
1685                             {'name__startswith': 'fw-version'},
1686                             {'name__startswith': 'fwrw-version'},
1687                             {'name__startswith': 'fwro-version'},
1688                             {'name__startswith': 'ab-version'},
1689                             {'name__startswith': 'testbed-version'}]
1690    result['labels'] = get_labels(
1691        label_exclude_filters,
1692        sort_by=['-platform', 'name'])
1693
1694    result['atomic_groups'] = get_atomic_groups(sort_by=['name'])
1695    result['tests'] = get_tests(sort_by=['name'])
1696    result['profilers'] = get_profilers(sort_by=['name'])
1697    result['current_user'] = rpc_utils.prepare_for_serialization(
1698        models.User.current_user().get_object_dict())
1699    result['host_statuses'] = sorted(models.Host.Status.names)
1700    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
1701    result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS
1702    result['job_max_runtime_mins_default'] = (
1703        models.Job.DEFAULT_MAX_RUNTIME_MINS)
1704    result['parse_failed_repair_default'] = bool(
1705        models.Job.DEFAULT_PARSE_FAILED_REPAIR)
1706    result['reboot_before_options'] = model_attributes.RebootBefore.names
1707    result['reboot_after_options'] = model_attributes.RebootAfter.names
1708    result['motd'] = rpc_utils.get_motd()
1709    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()
1710    result['drone_sets'] = drone_sets
1711    result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()
1712
1713    result['status_dictionary'] = {"Aborted": "Aborted",
1714                                   "Verifying": "Verifying Host",
1715                                   "Provisioning": "Provisioning Host",
1716                                   "Pending": "Waiting on other hosts",
1717                                   "Running": "Running autoserv",
1718                                   "Completed": "Autoserv completed",
1719                                   "Failed": "Failed to complete",
1720                                   "Queued": "Queued",
1721                                   "Starting": "Next in host's queue",
1722                                   "Stopped": "Other host(s) failed verify",
1723                                   "Parsing": "Awaiting parse of final results",
1724                                   "Gathering": "Gathering log files",
1725                                   "Template": "Template job for recurring run",
1726                                   "Waiting": "Waiting for scheduler action",
1727                                   "Archiving": "Archiving results",
1728                                   "Resetting": "Resetting hosts"}
1729
1730    result['wmatrix_url'] = rpc_utils.get_wmatrix_url()
1731    result['is_moblab'] = bool(utils.is_moblab())
1732
1733    return result
1734
1735
1736def get_server_time():
1737    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
1738
1739
1740def get_hosts_by_attribute(attribute, value):
1741    """
1742    Get the list of valid hosts that share the same host attribute value.
1743
1744    @param attribute: String of the host attribute to check.
1745    @param value: String of the value that is shared between hosts.
1746
1747    @returns List of hostnames that all have the same host attribute and
1748             value.
1749    """
1750    hosts = models.HostAttribute.query_objects({'attribute': attribute,
1751                                                'value': value})
1752    return [row.host.hostname for row in hosts if row.host.invalid == 0]
1753