rpc_interface.py revision b5b8b4f981036971c619b26956c8847140eabd35
1# pylint: disable-msg=C0111 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import ast 35import datetime 36import logging 37import sys 38 39from django.db.models import Count 40import common 41from autotest_lib.client.common_lib import control_data 42from autotest_lib.client.common_lib import priorities 43from autotest_lib.client.common_lib.cros import dev_server 44from autotest_lib.client.common_lib.cros.graphite import autotest_stats 45from autotest_lib.frontend.afe import control_file, rpc_utils 46from autotest_lib.frontend.afe import models, model_logic, model_attributes 47from autotest_lib.frontend.afe import site_rpc_interface 48from autotest_lib.frontend.tko import models as tko_models 49from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 50from autotest_lib.server import frontend 51from autotest_lib.server import utils 52from autotest_lib.server.cros import provision 53from autotest_lib.server.cros.dynamic_suite import tools 54from autotest_lib.server.lib import status_history 55 56 57_timer = autotest_stats.Timer('rpc_interface') 58 59def get_parameterized_autoupdate_image_url(job): 60 """Get the parameterized autoupdate image url from a parameterized job.""" 61 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 62 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 63 name='image') 64 para_set = job.parameterized_job.parameterizedjobparameter_set 65 job_test_para = para_set.get(test_parameter=image_parameter) 66 return job_test_para.parameter_value 67 68 69# labels 70 71def modify_label(id, **data): 72 """Modify a label. 73 74 @param id: id or name of a label. More often a label name. 75 @param data: New data for a label. 76 """ 77 label_model = models.Label.smart_get(id) 78 label_model.update_object(data) 79 80 # Master forwards the RPC to shards 81 if not utils.is_shard(): 82 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 83 id=id, **data) 84 85 86def delete_label(id): 87 """Delete a label. 88 89 @param id: id or name of a label. More often a label name. 90 """ 91 label_model = models.Label.smart_get(id) 92 # Hosts that have the label to be deleted. Save this info before 93 # the label is deleted to use it later. 94 hosts = [] 95 for h in label_model.host_set.all(): 96 hosts.append(models.Host.smart_get(h.id)) 97 label_model.delete() 98 99 # Master forwards the RPC to shards 100 if not utils.is_shard(): 101 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 102 103 104def add_label(name, ignore_exception_if_exists=False, **kwargs): 105 """Adds a new label of a given name. 106 107 @param name: label name. 108 @param ignore_exception_if_exists: If True and the exception was 109 thrown due to the duplicated label name when adding a label, 110 then suppress the exception. Default is False. 111 @param kwargs: keyword args that store more info about a label 112 other than the name. 113 @return: int/long id of a new label. 114 """ 115 # models.Label.add_object() throws model_logic.ValidationError 116 # when it is given a label name that already exists. 117 # However, ValidationError can be thrown with different errors, 118 # and those errors should be thrown up to the call chain. 119 try: 120 label = models.Label.add_object(name=name, **kwargs) 121 except: 122 exc_info = sys.exc_info() 123 if ignore_exception_if_exists: 124 label = rpc_utils.get_label(name) 125 # If the exception is raised not because of duplicated 126 # "name", then raise the original exception. 127 if label is None: 128 raise exc_info[0], exc_info[1], exc_info[2] 129 else: 130 raise exc_info[0], exc_info[1], exc_info[2] 131 return label.id 132 133 134def add_label_to_hosts(id, hosts): 135 """Adds a label of the given id to the given hosts only in local DB. 136 137 @param id: id or name of a label. More often a label name. 138 @param hosts: The hostnames of hosts that need the label. 139 140 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 141 """ 142 label = models.Label.smart_get(id) 143 host_objs = models.Host.smart_get_bulk(hosts) 144 if label.platform: 145 models.Host.check_no_platform(host_objs) 146 # Ensure a host has no more than one board label with it. 147 if label.name.startswith('board:'): 148 models.Host.check_board_labels_allowed(host_objs, [label.name]) 149 label.host_set.add(*host_objs) 150 151 152def _create_label_everywhere(id, hosts): 153 """ 154 Yet another method to create labels. 155 156 ALERT! This method should be run only on master not shards! 157 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 158 159 This method exists primarily to serve label_add_hosts() and 160 host_add_labels(). Basically it pulls out the label check/add logic 161 from label_add_hosts() into this nice method that not only creates 162 the label but also tells the shards that service the hosts to also 163 create the label. 164 165 @param id: id or name of a label. More often a label name. 166 @param hosts: A list of hostnames or ids. More often hostnames. 167 """ 168 try: 169 label = models.Label.smart_get(id) 170 except models.Label.DoesNotExist: 171 # This matches the type checks in smart_get, which is a hack 172 # in and off itself. The aim here is to create any non-existent 173 # label, which we cannot do if the 'id' specified isn't a label name. 174 if isinstance(id, basestring): 175 label = models.Label.smart_get(add_label(id)) 176 else: 177 raise ValueError('Label id (%s) does not exist. Please specify ' 178 'the argument, id, as a string (label name).' 179 % id) 180 181 # Make sure the label exists on the shard with the same id 182 # as it is on the master. 183 # It is possible that the label is already in a shard because 184 # we are adding a new label only to shards of hosts that the label 185 # is going to be attached. 186 # For example, we add a label L1 to a host in shard S1. 187 # Master and S1 will have L1 but other shards won't. 188 # Later, when we add the same label L1 to hosts in shards S1 and S2, 189 # S1 already has the label but S2 doesn't. 190 # S2 should have the new label without any problem. 191 # We ignore exception in such a case. 192 host_objs = models.Host.smart_get_bulk(hosts) 193 rpc_utils.fanout_rpc( 194 host_objs, 'add_label', include_hostnames=False, 195 name=label.name, ignore_exception_if_exists=True, 196 id=label.id, platform=label.platform) 197 198 199@rpc_utils.route_rpc_to_master 200def label_add_hosts(id, hosts): 201 """Adds a label with the given id to the given hosts. 202 203 This method should be run only on master not shards. 204 The given label will be created if it doesn't exist, provided the `id` 205 supplied is a label name not an int/long id. 206 207 @param id: id or name of a label. More often a label name. 208 @param hosts: A list of hostnames or ids. More often hostnames. 209 210 @raises ValueError: If the id specified is an int/long (label id) 211 while the label does not exist. 212 """ 213 # Create the label. 214 _create_label_everywhere(id, hosts) 215 216 # Add it to the master. 217 add_label_to_hosts(id, hosts) 218 219 # Add it to the shards. 220 host_objs = models.Host.smart_get_bulk(hosts) 221 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 222 223 224def remove_label_from_hosts(id, hosts): 225 """Removes a label of the given id from the given hosts only in local DB. 226 227 @param id: id or name of a label. 228 @param hosts: The hostnames of hosts that need to remove the label from. 229 """ 230 host_objs = models.Host.smart_get_bulk(hosts) 231 models.Label.smart_get(id).host_set.remove(*host_objs) 232 233 234@rpc_utils.route_rpc_to_master 235def label_remove_hosts(id, hosts): 236 """Removes a label of the given id from the given hosts. 237 238 This method should be run only on master not shards. 239 240 @param id: id or name of a label. 241 @param hosts: A list of hostnames or ids. More often hostnames. 242 """ 243 host_objs = models.Host.smart_get_bulk(hosts) 244 remove_label_from_hosts(id, hosts) 245 246 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 247 248 249def get_labels(exclude_filters=(), **filter_data): 250 """\ 251 @param exclude_filters: A sequence of dictionaries of filters. 252 253 @returns A sequence of nested dictionaries of label information. 254 """ 255 labels = models.Label.query_objects(filter_data) 256 for exclude_filter in exclude_filters: 257 labels = labels.exclude(**exclude_filter) 258 return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',)) 259 260 261# atomic groups 262 263def add_atomic_group(name, max_number_of_machines=None, description=None): 264 return models.AtomicGroup.add_object( 265 name=name, max_number_of_machines=max_number_of_machines, 266 description=description).id 267 268 269def modify_atomic_group(id, **data): 270 models.AtomicGroup.smart_get(id).update_object(data) 271 272 273def delete_atomic_group(id): 274 models.AtomicGroup.smart_get(id).delete() 275 276 277def atomic_group_add_labels(id, labels): 278 label_objs = models.Label.smart_get_bulk(labels) 279 models.AtomicGroup.smart_get(id).label_set.add(*label_objs) 280 281 282def atomic_group_remove_labels(id, labels): 283 label_objs = models.Label.smart_get_bulk(labels) 284 models.AtomicGroup.smart_get(id).label_set.remove(*label_objs) 285 286 287def get_atomic_groups(**filter_data): 288 return rpc_utils.prepare_for_serialization( 289 models.AtomicGroup.list_objects(filter_data)) 290 291 292# hosts 293 294def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 295 if locked and not lock_reason: 296 raise model_logic.ValidationError( 297 {'locked': 'Please provide a reason for locking when adding host.'}) 298 299 return models.Host.add_object(hostname=hostname, status=status, 300 locked=locked, lock_reason=lock_reason, 301 protection=protection).id 302 303 304@rpc_utils.route_rpc_to_master 305def modify_host(id, **kwargs): 306 """Modify local attributes of a host. 307 308 If this is called on the master, but the host is assigned to a shard, this 309 will call `modify_host_local` RPC to the responsible shard. This means if 310 a host is being locked using this function, this change will also propagate 311 to shards. 312 When this is called on a shard, the shard just routes the RPC to the master 313 and does nothing. 314 315 @param id: id of the host to modify. 316 @param kwargs: key=value pairs of values to set on the host. 317 """ 318 rpc_utils.check_modify_host(kwargs) 319 host = models.Host.smart_get(id) 320 try: 321 rpc_utils.check_modify_host_locking(host, kwargs) 322 except model_logic.ValidationError as e: 323 if not kwargs.get('force_modify_locking', False): 324 raise 325 logging.exception('The following exception will be ignored and lock ' 326 'modification will be enforced. %s', e) 327 328 # This is required to make `lock_time` for a host be exactly same 329 # between the master and a shard. 330 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 331 kwargs['lock_time'] = datetime.datetime.now() 332 host.update_object(kwargs) 333 334 # force_modifying_locking is not an internal field in database, remove. 335 kwargs.pop('force_modify_locking', None) 336 rpc_utils.fanout_rpc([host], 'modify_host_local', 337 include_hostnames=False, id=id, **kwargs) 338 339 340def modify_host_local(id, **kwargs): 341 """Modify host attributes in local DB. 342 343 @param id: Host id. 344 @param kwargs: key=value pairs of values to set on the host. 345 """ 346 models.Host.smart_get(id).update_object(kwargs) 347 348 349@rpc_utils.route_rpc_to_master 350def modify_hosts(host_filter_data, update_data): 351 """Modify local attributes of multiple hosts. 352 353 If this is called on the master, but one of the hosts in that match the 354 filters is assigned to a shard, this will call `modify_hosts_local` RPC 355 to the responsible shard. 356 When this is called on a shard, the shard just routes the RPC to the master 357 and does nothing. 358 359 The filters are always applied on the master, not on the shards. This means 360 if the states of a host differ on the master and a shard, the state on the 361 master will be used. I.e. this means: 362 A host was synced to Shard 1. On Shard 1 the status of the host was set to 363 'Repair Failed'. 364 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 365 update the host (both on the shard and on the master), because the state 366 of the host as the master knows it is still 'Ready'. 367 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 368 will not update the host, because the filter doesn't apply on the master. 369 370 @param host_filter_data: Filters out which hosts to modify. 371 @param update_data: A dictionary with the changes to make to the hosts. 372 """ 373 update_data = update_data.copy() 374 rpc_utils.check_modify_host(update_data) 375 hosts = models.Host.query_objects(host_filter_data) 376 377 affected_shard_hostnames = set() 378 affected_host_ids = [] 379 380 # Check all hosts before changing data for exception safety. 381 for host in hosts: 382 try: 383 rpc_utils.check_modify_host_locking(host, update_data) 384 except model_logic.ValidationError as e: 385 if not update_data.get('force_modify_locking', False): 386 raise 387 logging.exception('The following exception will be ignored and ' 388 'lock modification will be enforced. %s', e) 389 390 if host.shard: 391 affected_shard_hostnames.add(host.shard.rpc_hostname()) 392 affected_host_ids.append(host.id) 393 394 # This is required to make `lock_time` for a host be exactly same 395 # between the master and a shard. 396 if update_data.get('locked', None) and 'lock_time' not in update_data: 397 update_data['lock_time'] = datetime.datetime.now() 398 for host in hosts: 399 host.update_object(update_data) 400 401 update_data.pop('force_modify_locking', None) 402 # Caution: Changing the filter from the original here. See docstring. 403 rpc_utils.run_rpc_on_multiple_hostnames( 404 'modify_hosts_local', affected_shard_hostnames, 405 host_filter_data={'id__in': affected_host_ids}, 406 update_data=update_data) 407 408 409def modify_hosts_local(host_filter_data, update_data): 410 """Modify attributes of hosts in local DB. 411 412 @param host_filter_data: Filters out which hosts to modify. 413 @param update_data: A dictionary with the changes to make to the hosts. 414 """ 415 for host in models.Host.query_objects(host_filter_data): 416 host.update_object(update_data) 417 418 419def add_labels_to_host(id, labels): 420 """Adds labels to a given host only in local DB. 421 422 @param id: id or hostname for a host. 423 @param labels: ids or names for labels. 424 """ 425 label_objs = models.Label.smart_get_bulk(labels) 426 models.Host.smart_get(id).labels.add(*label_objs) 427 428 429@rpc_utils.route_rpc_to_master 430def host_add_labels(id, labels): 431 """Adds labels to a given host. 432 433 @param id: id or hostname for a host. 434 @param labels: ids or names for labels. 435 436 @raises ValidationError: If adding more than one platform/board label. 437 """ 438 # Create the labels on the master/shards. 439 for label in labels: 440 _create_label_everywhere(label, [id]) 441 442 label_objs = models.Label.smart_get_bulk(labels) 443 platforms = [label.name for label in label_objs if label.platform] 444 boards = [label.name for label in label_objs 445 if label.name.startswith('board:')] 446 if len(platforms) > 1 or not utils.board_labels_allowed(boards): 447 raise model_logic.ValidationError( 448 {'labels': ('Adding more than one platform label, or a list of ' 449 'non-compatible board labels.: %s %s' % 450 (', '.join(platforms), ', '.join(boards)))}) 451 452 host_obj = models.Host.smart_get(id) 453 if platforms: 454 models.Host.check_no_platform([host_obj]) 455 if boards: 456 models.Host.check_board_labels_allowed([host_obj], labels) 457 add_labels_to_host(id, labels) 458 459 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 460 id=id, labels=labels) 461 462 463def remove_labels_from_host(id, labels): 464 """Removes labels from a given host only in local DB. 465 466 @param id: id or hostname for a host. 467 @param labels: ids or names for labels. 468 """ 469 label_objs = models.Label.smart_get_bulk(labels) 470 models.Host.smart_get(id).labels.remove(*label_objs) 471 472 473@rpc_utils.route_rpc_to_master 474def host_remove_labels(id, labels): 475 """Removes labels from a given host. 476 477 @param id: id or hostname for a host. 478 @param labels: ids or names for labels. 479 """ 480 remove_labels_from_host(id, labels) 481 482 host_obj = models.Host.smart_get(id) 483 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 484 id=id, labels=labels) 485 486 487def get_host_attribute(attribute, **host_filter_data): 488 """ 489 @param attribute: string name of attribute 490 @param host_filter_data: filter data to apply to Hosts to choose hosts to 491 act upon 492 """ 493 hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data) 494 hosts = list(hosts) 495 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 496 'attribute_list') 497 host_attr_dicts = [] 498 for host_obj in hosts: 499 for attr_obj in host_obj.attribute_list: 500 if attr_obj.attribute == attribute: 501 host_attr_dicts.append(attr_obj.get_object_dict()) 502 return rpc_utils.prepare_for_serialization(host_attr_dicts) 503 504 505def set_host_attribute(attribute, value, **host_filter_data): 506 """ 507 @param attribute: string name of attribute 508 @param value: string, or None to delete an attribute 509 @param host_filter_data: filter data to apply to Hosts to choose hosts to 510 act upon 511 """ 512 assert host_filter_data # disallow accidental actions on all hosts 513 hosts = models.Host.query_objects(host_filter_data) 514 models.AclGroup.check_for_acl_violation_hosts(hosts) 515 for host in hosts: 516 host.set_or_delete_attribute(attribute, value) 517 518 # Master forwards this RPC to shards. 519 if not utils.is_shard(): 520 rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False, 521 attribute=attribute, value=value, **host_filter_data) 522 523 524@rpc_utils.forward_single_host_rpc_to_shard 525def delete_host(id): 526 models.Host.smart_get(id).delete() 527 528 529def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 530 exclude_atomic_group_hosts=False, valid_only=True, 531 include_current_job=False, **filter_data): 532 """Get a list of dictionaries which contains the information of hosts. 533 534 @param multiple_labels: match hosts in all of the labels given. Should 535 be a list of label names. 536 @param exclude_only_if_needed_labels: Exclude hosts with at least one 537 "only_if_needed" label applied. 538 @param exclude_atomic_group_hosts: Exclude hosts that have one or more 539 atomic group labels associated with them. 540 @param include_current_job: Set to True to include ids of currently running 541 job and special task. 542 """ 543 hosts = rpc_utils.get_host_query(multiple_labels, 544 exclude_only_if_needed_labels, 545 exclude_atomic_group_hosts, 546 valid_only, filter_data) 547 hosts = list(hosts) 548 models.Host.objects.populate_relationships(hosts, models.Label, 549 'label_list') 550 models.Host.objects.populate_relationships(hosts, models.AclGroup, 551 'acl_list') 552 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 553 'attribute_list') 554 host_dicts = [] 555 for host_obj in hosts: 556 host_dict = host_obj.get_object_dict() 557 host_dict['labels'] = [label.name for label in host_obj.label_list] 558 host_dict['platform'], host_dict['atomic_group'] = (rpc_utils. 559 find_platform_and_atomic_group(host_obj)) 560 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 561 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 562 for attribute in host_obj.attribute_list) 563 if include_current_job: 564 host_dict['current_job'] = None 565 host_dict['current_special_task'] = None 566 entries = models.HostQueueEntry.objects.filter( 567 host_id=host_dict['id'], active=True, complete=False) 568 if entries: 569 host_dict['current_job'] = ( 570 entries[0].get_object_dict()['job']) 571 tasks = models.SpecialTask.objects.filter( 572 host_id=host_dict['id'], is_active=True, is_complete=False) 573 if tasks: 574 host_dict['current_special_task'] = ( 575 '%d-%s' % (tasks[0].get_object_dict()['id'], 576 tasks[0].get_object_dict()['task'].lower())) 577 host_dicts.append(host_dict) 578 return rpc_utils.prepare_for_serialization(host_dicts) 579 580 581def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 582 exclude_atomic_group_hosts=False, valid_only=True, 583 **filter_data): 584 """ 585 Same parameters as get_hosts(). 586 587 @returns The number of matching hosts. 588 """ 589 hosts = rpc_utils.get_host_query(multiple_labels, 590 exclude_only_if_needed_labels, 591 exclude_atomic_group_hosts, 592 valid_only, filter_data) 593 return hosts.count() 594 595 596# tests 597 598def add_test(name, test_type, path, author=None, dependencies=None, 599 experimental=True, run_verify=None, test_class=None, 600 test_time=None, test_category=None, description=None, 601 sync_count=1): 602 return models.Test.add_object(name=name, test_type=test_type, path=path, 603 author=author, dependencies=dependencies, 604 experimental=experimental, 605 run_verify=run_verify, test_time=test_time, 606 test_category=test_category, 607 sync_count=sync_count, 608 test_class=test_class, 609 description=description).id 610 611 612def modify_test(id, **data): 613 models.Test.smart_get(id).update_object(data) 614 615 616def delete_test(id): 617 models.Test.smart_get(id).delete() 618 619 620def get_tests(**filter_data): 621 return rpc_utils.prepare_for_serialization( 622 models.Test.list_objects(filter_data)) 623 624 625@_timer.decorate 626def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 627 """Gets the counts of all passed and failed tests from the matching jobs. 628 629 @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g., 630 'butterfly-release/R40-6457.21.0/bvt-cq/'. 631 @param label_name: Label that must be set in the jobs, e.g., 632 'cros-version:butterfly-release/R40-6457.21.0'. 633 634 @returns A summary of the counts of all the passed and failed tests. 635 """ 636 job_ids = list(models.Job.objects.filter( 637 name__startswith=job_name_prefix, 638 dependency_labels__name=label_name).values_list( 639 'pk', flat=True)) 640 summary = {'passed': 0, 'failed': 0} 641 if not job_ids: 642 return summary 643 644 counts = (tko_models.TestView.objects.filter( 645 afe_job_id__in=job_ids).exclude( 646 test_name='SERVER_JOB').exclude( 647 test_name__startswith='CLIENT_JOB').values( 648 'status').annotate( 649 count=Count('status'))) 650 for status in counts: 651 if status['status'] == 'GOOD': 652 summary['passed'] += status['count'] 653 else: 654 summary['failed'] += status['count'] 655 return summary 656 657 658# profilers 659 660def add_profiler(name, description=None): 661 return models.Profiler.add_object(name=name, description=description).id 662 663 664def modify_profiler(id, **data): 665 models.Profiler.smart_get(id).update_object(data) 666 667 668def delete_profiler(id): 669 models.Profiler.smart_get(id).delete() 670 671 672def get_profilers(**filter_data): 673 return rpc_utils.prepare_for_serialization( 674 models.Profiler.list_objects(filter_data)) 675 676 677# users 678 679def add_user(login, access_level=None): 680 return models.User.add_object(login=login, access_level=access_level).id 681 682 683def modify_user(id, **data): 684 models.User.smart_get(id).update_object(data) 685 686 687def delete_user(id): 688 models.User.smart_get(id).delete() 689 690 691def get_users(**filter_data): 692 return rpc_utils.prepare_for_serialization( 693 models.User.list_objects(filter_data)) 694 695 696# acl groups 697 698def add_acl_group(name, description=None): 699 group = models.AclGroup.add_object(name=name, description=description) 700 group.users.add(models.User.current_user()) 701 return group.id 702 703 704def modify_acl_group(id, **data): 705 group = models.AclGroup.smart_get(id) 706 group.check_for_acl_violation_acl_group() 707 group.update_object(data) 708 group.add_current_user_if_empty() 709 710 711def acl_group_add_users(id, users): 712 group = models.AclGroup.smart_get(id) 713 group.check_for_acl_violation_acl_group() 714 users = models.User.smart_get_bulk(users) 715 group.users.add(*users) 716 717 718def acl_group_remove_users(id, users): 719 group = models.AclGroup.smart_get(id) 720 group.check_for_acl_violation_acl_group() 721 users = models.User.smart_get_bulk(users) 722 group.users.remove(*users) 723 group.add_current_user_if_empty() 724 725 726def acl_group_add_hosts(id, hosts): 727 group = models.AclGroup.smart_get(id) 728 group.check_for_acl_violation_acl_group() 729 hosts = models.Host.smart_get_bulk(hosts) 730 group.hosts.add(*hosts) 731 group.on_host_membership_change() 732 733 734def acl_group_remove_hosts(id, hosts): 735 group = models.AclGroup.smart_get(id) 736 group.check_for_acl_violation_acl_group() 737 hosts = models.Host.smart_get_bulk(hosts) 738 group.hosts.remove(*hosts) 739 group.on_host_membership_change() 740 741 742def delete_acl_group(id): 743 models.AclGroup.smart_get(id).delete() 744 745 746def get_acl_groups(**filter_data): 747 acl_groups = models.AclGroup.list_objects(filter_data) 748 for acl_group in acl_groups: 749 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 750 acl_group['users'] = [user.login 751 for user in acl_group_obj.users.all()] 752 acl_group['hosts'] = [host.hostname 753 for host in acl_group_obj.hosts.all()] 754 return rpc_utils.prepare_for_serialization(acl_groups) 755 756 757# jobs 758 759def generate_control_file(tests=(), profilers=(), 760 client_control_file='', use_container=False, 761 profile_only=None, db_tests=True, 762 test_source_build=None): 763 """ 764 Generates a client-side control file to run tests. 765 766 @param tests List of tests to run. See db_tests for more information. 767 @param profilers List of profilers to activate during the job. 768 @param client_control_file The contents of a client-side control file to 769 run at the end of all tests. If this is supplied, all tests must be 770 client side. 771 TODO: in the future we should support server control files directly 772 to wrap with a kernel. That'll require changing the parameter 773 name and adding a boolean to indicate if it is a client or server 774 control file. 775 @param use_container unused argument today. TODO: Enable containers 776 on the host during a client side test. 777 @param profile_only A boolean that indicates what default profile_only 778 mode to use in the control file. Passing None will generate a 779 control file that does not explcitly set the default mode at all. 780 @param db_tests: if True, the test object can be found in the database 781 backing the test model. In this case, tests is a tuple 782 of test IDs which are used to retrieve the test objects 783 from the database. If False, tests is a tuple of test 784 dictionaries stored client-side in the AFE. 785 @param test_source_build: Build to be used to retrieve test code. Default 786 to None. 787 788 @returns a dict with the following keys: 789 control_file: str, The control file text. 790 is_server: bool, is the control file a server-side control file? 791 synch_count: How many machines the job uses per autoserv execution. 792 synch_count == 1 means the job is asynchronous. 793 dependencies: A list of the names of labels on which the job depends. 794 """ 795 if not tests and not client_control_file: 796 return dict(control_file='', is_server=False, synch_count=1, 797 dependencies=[]) 798 799 cf_info, test_objects, profiler_objects = ( 800 rpc_utils.prepare_generate_control_file(tests, profilers, 801 db_tests)) 802 cf_info['control_file'] = control_file.generate_control( 803 tests=test_objects, profilers=profiler_objects, 804 is_server=cf_info['is_server'], 805 client_control_file=client_control_file, profile_only=profile_only, 806 test_source_build=test_source_build) 807 return cf_info 808 809 810def create_parameterized_job(name, priority, test, parameters, kernel=None, 811 label=None, profilers=(), profiler_parameters=None, 812 use_container=False, profile_only=None, 813 upload_kernel_config=False, hosts=(), 814 meta_hosts=(), one_time_hosts=(), 815 atomic_group_name=None, synch_count=None, 816 is_template=False, timeout=None, 817 timeout_mins=None, max_runtime_mins=None, 818 run_verify=False, email_list='', dependencies=(), 819 reboot_before=None, reboot_after=None, 820 parse_failed_repair=None, hostless=False, 821 keyvals=None, drone_set=None, run_reset=True, 822 require_ssp=None): 823 """ 824 Creates and enqueues a parameterized job. 825 826 Most parameters a combination of the parameters for generate_control_file() 827 and create_job(), with the exception of: 828 829 @param test name or ID of the test to run 830 @param parameters a map of parameter name -> 831 tuple of (param value, param type) 832 @param profiler_parameters a dictionary of parameters for the profilers: 833 key: profiler name 834 value: dict of param name -> tuple of 835 (param value, 836 param type) 837 """ 838 # Save the values of the passed arguments here. What we're going to do with 839 # them is pass them all to rpc_utils.get_create_job_common_args(), which 840 # will extract the subset of these arguments that apply for 841 # rpc_utils.create_job_common(), which we then pass in to that function. 842 args = locals() 843 844 # Set up the parameterized job configs 845 test_obj = models.Test.smart_get(test) 846 control_type = test_obj.test_type 847 848 try: 849 label = models.Label.smart_get(label) 850 except models.Label.DoesNotExist: 851 label = None 852 853 kernel_objs = models.Kernel.create_kernels(kernel) 854 profiler_objs = [models.Profiler.smart_get(profiler) 855 for profiler in profilers] 856 857 parameterized_job = models.ParameterizedJob.objects.create( 858 test=test_obj, label=label, use_container=use_container, 859 profile_only=profile_only, 860 upload_kernel_config=upload_kernel_config) 861 parameterized_job.kernels.add(*kernel_objs) 862 863 for profiler in profiler_objs: 864 parameterized_profiler = models.ParameterizedJobProfiler.objects.create( 865 parameterized_job=parameterized_job, 866 profiler=profiler) 867 profiler_params = profiler_parameters.get(profiler.name, {}) 868 for name, (value, param_type) in profiler_params.iteritems(): 869 models.ParameterizedJobProfilerParameter.objects.create( 870 parameterized_job_profiler=parameterized_profiler, 871 parameter_name=name, 872 parameter_value=value, 873 parameter_type=param_type) 874 875 try: 876 for parameter in test_obj.testparameter_set.all(): 877 if parameter.name in parameters: 878 param_value, param_type = parameters.pop(parameter.name) 879 parameterized_job.parameterizedjobparameter_set.create( 880 test_parameter=parameter, parameter_value=param_value, 881 parameter_type=param_type) 882 883 if parameters: 884 raise Exception('Extra parameters remain: %r' % parameters) 885 886 return rpc_utils.create_job_common( 887 parameterized_job=parameterized_job.id, 888 control_type=control_type, 889 **rpc_utils.get_create_job_common_args(args)) 890 except: 891 parameterized_job.delete() 892 raise 893 894 895def create_job_page_handler(name, priority, control_file, control_type, 896 image=None, hostless=False, firmware_rw_build=None, 897 firmware_ro_build=None, test_source_build=None, 898 is_cloning=False, **kwargs): 899 """\ 900 Create and enqueue a job. 901 902 @param name name of this job 903 @param priority Integer priority of this job. Higher is more important. 904 @param control_file String contents of the control file. 905 @param control_type Type of control file, Client or Server. 906 @param image: ChromeOS build to be installed in the dut. Default to None. 907 @param firmware_rw_build: Firmware build to update RW firmware. Default to 908 None, i.e., RW firmware will not be updated. 909 @param firmware_ro_build: Firmware build to update RO firmware. Default to 910 None, i.e., RO firmware will not be updated. 911 @param test_source_build: Build to be used to retrieve test code. Default 912 to None. 913 @param is_cloning: True if creating a cloning job. 914 @param kwargs extra args that will be required by create_suite_job or 915 create_job. 916 917 @returns The created Job id number. 918 """ 919 if is_cloning: 920 logging.info('Start to clone a new job') 921 # When cloning a job, hosts and meta_hosts should not exist together, 922 # which would cause host-scheduler to schedule two hqe jobs to one host 923 # at the same time, and crash itself. Clear meta_hosts for this case. 924 if kwargs.get('hosts') and kwargs.get('meta_hosts'): 925 kwargs['meta_hosts'] = [] 926 else: 927 logging.info('Start to create a new job') 928 control_file = rpc_utils.encode_ascii(control_file) 929 if not control_file: 930 raise model_logic.ValidationError({ 931 'control_file' : "Control file cannot be empty"}) 932 933 if image and hostless: 934 builds = {} 935 builds[provision.CROS_VERSION_PREFIX] = image 936 if firmware_rw_build: 937 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 938 if firmware_ro_build: 939 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 940 return site_rpc_interface.create_suite_job( 941 name=name, control_file=control_file, priority=priority, 942 builds=builds, test_source_build=test_source_build, 943 is_cloning=is_cloning, **kwargs) 944 return create_job(name, priority, control_file, control_type, image=image, 945 hostless=hostless, is_cloning=is_cloning, **kwargs) 946 947 948@rpc_utils.route_rpc_to_master 949def create_job(name, priority, control_file, control_type, 950 hosts=(), meta_hosts=(), one_time_hosts=(), 951 atomic_group_name=None, synch_count=None, is_template=False, 952 timeout=None, timeout_mins=None, max_runtime_mins=None, 953 run_verify=False, email_list='', dependencies=(), 954 reboot_before=None, reboot_after=None, parse_failed_repair=None, 955 hostless=False, keyvals=None, drone_set=None, image=None, 956 parent_job_id=None, test_retry=0, run_reset=True, 957 require_ssp=None, args=(), is_cloning=False, **kwargs): 958 """\ 959 Create and enqueue a job. 960 961 @param name name of this job 962 @param priority Integer priority of this job. Higher is more important. 963 @param control_file String contents of the control file. 964 @param control_type Type of control file, Client or Server. 965 @param synch_count How many machines the job uses per autoserv execution. 966 synch_count == 1 means the job is asynchronous. If an atomic group is 967 given this value is treated as a minimum. 968 @param is_template If true then create a template job. 969 @param timeout Hours after this call returns until the job times out. 970 @param timeout_mins Minutes after this call returns until the job times 971 out. 972 @param max_runtime_mins Minutes from job starting time until job times out 973 @param run_verify Should the host be verified before running the test? 974 @param email_list String containing emails to mail when the job is done 975 @param dependencies List of label names on which this job depends 976 @param reboot_before Never, If dirty, or Always 977 @param reboot_after Never, If all tests passed, or Always 978 @param parse_failed_repair if true, results of failed repairs launched by 979 this job will be parsed as part of the job. 980 @param hostless if true, create a hostless job 981 @param keyvals dict of keyvals to associate with the job 982 @param hosts List of hosts to run job on. 983 @param meta_hosts List where each entry is a label name, and for each entry 984 one host will be chosen from that label to run the job on. 985 @param one_time_hosts List of hosts not in the database to run the job on. 986 @param atomic_group_name The name of an atomic group to schedule the job on. 987 @param drone_set The name of the drone set to run this test on. 988 @param image OS image to install before running job. 989 @param parent_job_id id of a job considered to be parent of created job. 990 @param test_retry Number of times to retry test if the test did not 991 complete successfully. (optional, default: 0) 992 @param run_reset Should the host be reset before running the test? 993 @param require_ssp Set to True to require server-side packaging to run the 994 test. If it's set to None, drone will still try to run 995 the server side with server-side packaging. If the 996 autotest-server package doesn't exist for the build or 997 image is not set, drone will run the test without server- 998 side packaging. Default is None. 999 @param args A list of args to be injected into control file. 1000 @param is_cloning: True if creating a cloning job. 1001 @param kwargs extra keyword args. NOT USED. 1002 1003 @returns The created Job id number. 1004 """ 1005 if args: 1006 control_file = tools.inject_vars({'args': args}, control_file) 1007 1008 if image is None: 1009 return rpc_utils.create_job_common( 1010 **rpc_utils.get_create_job_common_args(locals())) 1011 1012 # Translate the image name, in case its a relative build name. 1013 ds = dev_server.ImageServer.resolve(image) 1014 image = ds.translate(image) 1015 1016 # When image is supplied use a known parameterized test already in the 1017 # database to pass the OS image path from the front end, through the 1018 # scheduler, and finally to autoserv as the --image parameter. 1019 1020 # The test autoupdate_ParameterizedJob is in afe_autotests and used to 1021 # instantiate a Test object and from there a ParameterizedJob. 1022 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 1023 known_parameterized_job = models.ParameterizedJob.objects.create( 1024 test=known_test_obj) 1025 1026 # autoupdate_ParameterizedJob has a single parameter, the image parameter, 1027 # stored in the table afe_test_parameters. We retrieve and set this 1028 # instance of the parameter to the OS image path. 1029 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 1030 name='image') 1031 known_parameterized_job.parameterizedjobparameter_set.create( 1032 test_parameter=image_parameter, parameter_value=image, 1033 parameter_type='string') 1034 1035 # TODO(crbug.com/502638): save firmware build etc to parameterized_job. 1036 1037 # By passing a parameterized_job to create_job_common the job entry in 1038 # the afe_jobs table will have the field parameterized_job_id set. 1039 # The scheduler uses this id in the afe_parameterized_jobs table to 1040 # match this job to our known test, and then with the 1041 # afe_parameterized_job_parameters table to get the actual image path. 1042 return rpc_utils.create_job_common( 1043 parameterized_job=known_parameterized_job.id, 1044 **rpc_utils.get_create_job_common_args(locals())) 1045 1046 1047def abort_host_queue_entries(**filter_data): 1048 """\ 1049 Abort a set of host queue entries. 1050 1051 @return: A list of dictionaries, each contains information 1052 about an aborted HQE. 1053 """ 1054 query = models.HostQueueEntry.query_objects(filter_data) 1055 1056 # Dont allow aborts on: 1057 # 1. Jobs that have already completed (whether or not they were aborted) 1058 # 2. Jobs that we have already been aborted (but may not have completed) 1059 query = query.filter(complete=False).filter(aborted=False) 1060 models.AclGroup.check_abort_permissions(query) 1061 host_queue_entries = list(query.select_related()) 1062 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1063 1064 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1065 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1066 'Job name': hqe.job.name} for hqe in host_queue_entries] 1067 return hqe_info 1068 1069 1070def abort_special_tasks(**filter_data): 1071 """\ 1072 Abort the special task, or tasks, specified in the filter. 1073 """ 1074 query = models.SpecialTask.query_objects(filter_data) 1075 special_tasks = query.filter(is_active=True) 1076 for task in special_tasks: 1077 task.abort() 1078 1079 1080def _call_special_tasks_on_hosts(task, hosts): 1081 """\ 1082 Schedules a set of hosts for a special task. 1083 1084 @returns A list of hostnames that a special task was created for. 1085 """ 1086 models.AclGroup.check_for_acl_violation_hosts(hosts) 1087 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1088 if shard_host_map and not utils.is_shard(): 1089 raise ValueError('The following hosts are on shards, please ' 1090 'follow the link to the shards and create jobs ' 1091 'there instead. %s.' % shard_host_map) 1092 for host in hosts: 1093 models.SpecialTask.schedule_special_task(host, task) 1094 return list(sorted(host.hostname for host in hosts)) 1095 1096 1097def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1098 """Forward special tasks to corresponding shards. 1099 1100 For master, when special tasks are fired on hosts that are sharded, 1101 forward the RPC to corresponding shards. 1102 1103 For shard, create special task records in local DB. 1104 1105 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1106 @param rpc: RPC name to forward. 1107 @param filter_data: Filter keywords to be used for DB query. 1108 1109 @return: A list of hostnames that a special task was created for. 1110 """ 1111 hosts = models.Host.query_objects(filter_data) 1112 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True) 1113 1114 # Filter out hosts on a shard from those on the master, forward 1115 # rpcs to the shard with an additional hostname__in filter, and 1116 # create a local SpecialTask for each remaining host. 1117 if shard_host_map and not utils.is_shard(): 1118 hosts = [h for h in hosts if h.shard is None] 1119 for shard, hostnames in shard_host_map.iteritems(): 1120 1121 # The main client of this module is the frontend website, and 1122 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1123 # the 'hostname' filter should narrow down the list of hosts on 1124 # each shard even though we supply all the ids in filter_data. 1125 # This method uses hostname instead of id because it fits better 1126 # with the overall architecture of redirection functions in 1127 # rpc_utils. 1128 shard_filter = filter_data.copy() 1129 shard_filter['hostname__in'] = hostnames 1130 rpc_utils.run_rpc_on_multiple_hostnames( 1131 rpc, [shard], **shard_filter) 1132 1133 # There is a race condition here if someone assigns a shard to one of these 1134 # hosts before we create the task. The host will stay on the master if: 1135 # 1. The host is not Ready 1136 # 2. The host is Ready but has a task 1137 # But if the host is Ready and doesn't have a task yet, it will get sent 1138 # to the shard as we're creating a task here. 1139 1140 # Given that we only rarely verify Ready hosts it isn't worth putting this 1141 # entire method in a transaction. The worst case scenario is that we have 1142 # a verify running on a Ready host while the shard is using it, if the 1143 # verify fails no subsequent tasks will be created against the host on the 1144 # master, and verifies are safe enough that this is OK. 1145 return _call_special_tasks_on_hosts(task, hosts) 1146 1147 1148def reverify_hosts(**filter_data): 1149 """\ 1150 Schedules a set of hosts for verify. 1151 1152 @returns A list of hostnames that a verify task was created for. 1153 """ 1154 return _forward_special_tasks_on_hosts( 1155 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1156 1157 1158def repair_hosts(**filter_data): 1159 """\ 1160 Schedules a set of hosts for repair. 1161 1162 @returns A list of hostnames that a repair task was created for. 1163 """ 1164 return _forward_special_tasks_on_hosts( 1165 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1166 1167 1168def get_jobs(not_yet_run=False, running=False, finished=False, 1169 suite=False, sub=False, standalone=False, **filter_data): 1170 """\ 1171 Extra status filter args for get_jobs: 1172 -not_yet_run: Include only jobs that have not yet started running. 1173 -running: Include only jobs that have start running but for which not 1174 all hosts have completed. 1175 -finished: Include only jobs for which all hosts have completed (or 1176 aborted). 1177 1178 Extra type filter args for get_jobs: 1179 -suite: Include only jobs with child jobs. 1180 -sub: Include only jobs with a parent job. 1181 -standalone: Inlcude only jobs with no child or parent jobs. 1182 At most one of these three fields should be specified. 1183 """ 1184 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1185 running, 1186 finished) 1187 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1188 suite, 1189 sub, 1190 standalone) 1191 job_dicts = [] 1192 jobs = list(models.Job.query_objects(filter_data)) 1193 models.Job.objects.populate_relationships(jobs, models.Label, 1194 'dependencies') 1195 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1196 for job in jobs: 1197 job_dict = job.get_object_dict() 1198 job_dict['dependencies'] = ','.join(label.name 1199 for label in job.dependencies) 1200 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1201 for keyval in job.keyvals) 1202 if job.parameterized_job: 1203 job_dict['image'] = get_parameterized_autoupdate_image_url(job) 1204 job_dicts.append(job_dict) 1205 return rpc_utils.prepare_for_serialization(job_dicts) 1206 1207 1208def get_num_jobs(not_yet_run=False, running=False, finished=False, 1209 suite=False, sub=False, standalone=False, 1210 **filter_data): 1211 """\ 1212 See get_jobs() for documentation of extra filter parameters. 1213 """ 1214 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1215 running, 1216 finished) 1217 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1218 suite, 1219 sub, 1220 standalone) 1221 return models.Job.query_count(filter_data) 1222 1223 1224def get_jobs_summary(**filter_data): 1225 """\ 1226 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1227 1228 'status_counts' filed is a dictionary mapping status strings to the number 1229 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1230 1231 'result_counts' field is piped to tko's rpc_interface and has the return 1232 format specified under get_group_counts. 1233 """ 1234 jobs = get_jobs(**filter_data) 1235 ids = [job['id'] for job in jobs] 1236 all_status_counts = models.Job.objects.get_status_counts(ids) 1237 for job in jobs: 1238 job['status_counts'] = all_status_counts[job['id']] 1239 job['result_counts'] = tko_rpc_interface.get_status_counts( 1240 ['afe_job_id', 'afe_job_id'], 1241 header_groups=[['afe_job_id'], ['afe_job_id']], 1242 **{'afe_job_id': job['id']}) 1243 return rpc_utils.prepare_for_serialization(jobs) 1244 1245 1246def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1247 """\ 1248 Retrieves all the information needed to clone a job. 1249 """ 1250 job = models.Job.objects.get(id=id) 1251 job_info = rpc_utils.get_job_info(job, 1252 preserve_metahosts, 1253 queue_entry_filter_data) 1254 1255 host_dicts = [] 1256 for host in job_info['hosts']: 1257 host_dict = get_hosts(id=host.id)[0] 1258 other_labels = host_dict['labels'] 1259 if host_dict['platform']: 1260 other_labels.remove(host_dict['platform']) 1261 host_dict['other_labels'] = ', '.join(other_labels) 1262 host_dicts.append(host_dict) 1263 1264 for host in job_info['one_time_hosts']: 1265 host_dict = dict(hostname=host.hostname, 1266 id=host.id, 1267 platform='(one-time host)', 1268 locked_text='') 1269 host_dicts.append(host_dict) 1270 1271 # convert keys from Label objects to strings (names of labels) 1272 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1273 in job_info['meta_host_counts'].iteritems()) 1274 1275 info = dict(job=job.get_object_dict(), 1276 meta_host_counts=meta_host_counts, 1277 hosts=host_dicts) 1278 info['job']['dependencies'] = job_info['dependencies'] 1279 if job_info['atomic_group']: 1280 info['atomic_group_name'] = (job_info['atomic_group']).name 1281 else: 1282 info['atomic_group_name'] = None 1283 info['hostless'] = job_info['hostless'] 1284 info['drone_set'] = job.drone_set and job.drone_set.name 1285 1286 image = _get_image_for_job(job, job_info['hostless']) 1287 if image: 1288 info['job']['image'] = image 1289 1290 return rpc_utils.prepare_for_serialization(info) 1291 1292 1293def _get_image_for_job(job, hostless): 1294 """ Gets the image used for a job. 1295 1296 Gets the image used for an AFE job. If the job is a parameterized job, get 1297 the image from the job parameter; otherwise, tries to get the image from 1298 the job's keyvals 'build' or 'builds'. As a last resort, if the job is a 1299 hostless job, tries to get the image from its control file attributes 1300 'build' or 'builds'. 1301 1302 TODO(ntang): Needs to handle FAFT with two builds for ro/rw. 1303 1304 @param job An AFE job object. 1305 @param hostless Boolean on of the job is hostless. 1306 1307 @returns The image build used for the job. 1308 """ 1309 image = None 1310 if job.parameterized_job: 1311 image = get_parameterized_autoupdate_image_url(job) 1312 else: 1313 keyvals = job.keyval_dict() 1314 image = keyvals.get('build') 1315 if not image: 1316 value = keyvals.get('builds') 1317 builds = None 1318 if isinstance(value, dict): 1319 builds = value 1320 elif isinstance(value, basestring): 1321 builds = ast.literal_eval(value) 1322 if builds: 1323 image = builds.get('cros-version') 1324 if not image and hostless and job.control_file: 1325 try: 1326 control_obj = control_data.parse_control_string( 1327 job.control_file) 1328 if hasattr(control_obj, 'build'): 1329 image = getattr(control_obj, 'build') 1330 if not image and hasattr(control_obj, 'builds'): 1331 builds = getattr(control_obj, 'builds') 1332 image = builds.get('cros-version') 1333 except: 1334 logging.warning('Failed to parse control file for job: %s', 1335 job.name) 1336 return image 1337 1338 1339def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1340 """\ 1341 @returns A sequence of nested dictionaries of host and job information. 1342 """ 1343 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1344 'started_on__lte', 1345 start_time, 1346 end_time, 1347 **filter_data) 1348 return rpc_utils.prepare_rows_as_nested_dicts( 1349 models.HostQueueEntry.query_objects(filter_data), 1350 ('host', 'atomic_group', 'job')) 1351 1352 1353def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1354 """\ 1355 Get the number of host queue entries associated with this job. 1356 """ 1357 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1358 'started_on__lte', 1359 start_time, 1360 end_time, 1361 **filter_data) 1362 return models.HostQueueEntry.query_count(filter_data) 1363 1364 1365def get_hqe_percentage_complete(**filter_data): 1366 """ 1367 Computes the fraction of host queue entries matching the given filter data 1368 that are complete. 1369 """ 1370 query = models.HostQueueEntry.query_objects(filter_data) 1371 complete_count = query.filter(complete=True).count() 1372 total_count = query.count() 1373 if total_count == 0: 1374 return 1 1375 return float(complete_count) / total_count 1376 1377 1378# special tasks 1379 1380def get_special_tasks(**filter_data): 1381 """Get special task entries from the local database. 1382 1383 Query the special tasks table for tasks matching the given 1384 `filter_data`, and return a list of the results. No attempt is 1385 made to forward the call to shards; the buck will stop here. 1386 The caller is expected to know the target shard for such reasons 1387 as: 1388 * The caller is a service (such as gs_offloader) configured 1389 to operate on behalf of one specific shard, and no other. 1390 * The caller has a host as a parameter, and knows that this is 1391 the shard assigned to that host. 1392 1393 @param filter_data Filter keywords to pass to the underlying 1394 database query. 1395 1396 """ 1397 return rpc_utils.prepare_rows_as_nested_dicts( 1398 models.SpecialTask.query_objects(filter_data), 1399 ('host', 'queue_entry')) 1400 1401 1402def get_host_special_tasks(host_id, **filter_data): 1403 """Get special task entries for a given host. 1404 1405 Query the special tasks table for tasks that ran on the host 1406 given by `host_id` and matching the given `filter_data`. 1407 Return a list of the results. If the host is assigned to a 1408 shard, forward this call to that shard. 1409 1410 @param host_id Id in the database of the target host. 1411 @param filter_data Filter keywords to pass to the underlying 1412 database query. 1413 1414 """ 1415 # Retrieve host data even if the host is in an invalid state. 1416 host = models.Host.smart_get(host_id, False) 1417 if not host.shard: 1418 return get_special_tasks(host_id=host_id, **filter_data) 1419 else: 1420 # The return values from AFE methods are post-processed 1421 # objects that aren't JSON-serializable. So, we have to 1422 # call AFE.run() to get the raw, serializable output from 1423 # the shard. 1424 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1425 return shard_afe.run('get_special_tasks', 1426 host_id=host_id, **filter_data) 1427 1428 1429def get_num_special_tasks(**kwargs): 1430 """Get the number of special task entries from the local database. 1431 1432 Query the special tasks table for tasks matching the given 'kwargs', 1433 and return the number of the results. No attempt is made to forward 1434 the call to shards; the buck will stop here. 1435 1436 @param kwargs Filter keywords to pass to the underlying database query. 1437 1438 """ 1439 return models.SpecialTask.query_count(kwargs) 1440 1441 1442def get_host_num_special_tasks(host, **kwargs): 1443 """Get special task entries for a given host. 1444 1445 Query the special tasks table for tasks that ran on the host 1446 given by 'host' and matching the given 'kwargs'. 1447 Return a list of the results. If the host is assigned to a 1448 shard, forward this call to that shard. 1449 1450 @param host id or name of a host. More often a hostname. 1451 @param kwargs Filter keywords to pass to the underlying database query. 1452 1453 """ 1454 # Retrieve host data even if the host is in an invalid state. 1455 host_model = models.Host.smart_get(host, False) 1456 if not host_model.shard: 1457 return get_num_special_tasks(host=host, **kwargs) 1458 else: 1459 shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname()) 1460 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1461 1462 1463def get_status_task(host_id, end_time): 1464 """Get the "status task" for a host from the local shard. 1465 1466 Returns a single special task representing the given host's 1467 "status task". The status task is a completed special task that 1468 identifies whether the corresponding host was working or broken 1469 when it completed. A successful task indicates a working host; 1470 a failed task indicates broken. 1471 1472 This call will not be forward to a shard; the receiving server 1473 must be the shard that owns the host. 1474 1475 @param host_id Id in the database of the target host. 1476 @param end_time Time reference for the host's status. 1477 1478 @return A single task; its status (successful or not) 1479 corresponds to the status of the host (working or 1480 broken) at the given time. If no task is found, return 1481 `None`. 1482 1483 """ 1484 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1485 status_history.get_status_task(host_id, end_time), 1486 ('host', 'queue_entry')) 1487 return tasklist[0] if tasklist else None 1488 1489 1490def get_host_status_task(host_id, end_time): 1491 """Get the "status task" for a host from its owning shard. 1492 1493 Finds the given host's owning shard, and forwards to it a call 1494 to `get_status_task()` (see above). 1495 1496 @param host_id Id in the database of the target host. 1497 @param end_time Time reference for the host's status. 1498 1499 @return A single task; its status (successful or not) 1500 corresponds to the status of the host (working or 1501 broken) at the given time. If no task is found, return 1502 `None`. 1503 1504 """ 1505 host = models.Host.smart_get(host_id) 1506 if not host.shard: 1507 return get_status_task(host_id, end_time) 1508 else: 1509 # The return values from AFE methods are post-processed 1510 # objects that aren't JSON-serializable. So, we have to 1511 # call AFE.run() to get the raw, serializable output from 1512 # the shard. 1513 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1514 return shard_afe.run('get_status_task', 1515 host_id=host_id, end_time=end_time) 1516 1517 1518def get_host_diagnosis_interval(host_id, end_time, success): 1519 """Find a "diagnosis interval" for a given host. 1520 1521 A "diagnosis interval" identifies a start and end time where 1522 the host went from "working" to "broken", or vice versa. The 1523 interval's starting time is the starting time of the last status 1524 task with the old status; the end time is the finish time of the 1525 first status task with the new status. 1526 1527 This routine finds the most recent diagnosis interval for the 1528 given host prior to `end_time`, with a starting status matching 1529 `success`. If `success` is true, the interval will start with a 1530 successful status task; if false the interval will start with a 1531 failed status task. 1532 1533 @param host_id Id in the database of the target host. 1534 @param end_time Time reference for the diagnosis interval. 1535 @param success Whether the diagnosis interval should start 1536 with a successful or failed status task. 1537 1538 @return A list of two strings. The first is the timestamp for 1539 the beginning of the interval; the second is the 1540 timestamp for the end. If the host has never changed 1541 state, the list is empty. 1542 1543 """ 1544 host = models.Host.smart_get(host_id) 1545 if not host.shard or utils.is_shard(): 1546 return status_history.get_diagnosis_interval( 1547 host_id, end_time, success) 1548 else: 1549 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1550 return shard_afe.get_host_diagnosis_interval( 1551 host_id, end_time, success) 1552 1553 1554# support for host detail view 1555 1556def get_host_queue_entries_and_special_tasks(host, query_start=None, 1557 query_limit=None, start_time=None, 1558 end_time=None): 1559 """ 1560 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1561 in approximate run order. each dict contains keys for type, host, 1562 job, status, started_on, execution_path, and ID. 1563 """ 1564 total_limit = None 1565 if query_limit is not None: 1566 total_limit = query_start + query_limit 1567 filter_data_common = {'host': host, 1568 'query_limit': total_limit, 1569 'sort_by': ['-id']} 1570 1571 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1572 'time_started__gte', 'time_started__lte', start_time, end_time, 1573 **filter_data_common) 1574 1575 queue_entries = get_host_queue_entries( 1576 start_time, end_time, **filter_data_common) 1577 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1578 1579 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1580 special_tasks) 1581 if query_start is not None: 1582 interleaved_entries = interleaved_entries[query_start:] 1583 if query_limit is not None: 1584 interleaved_entries = interleaved_entries[:query_limit] 1585 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1586 interleaved_entries, queue_entries) 1587 1588 1589def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1590 end_time=None): 1591 filter_data_common = {'host': host} 1592 1593 filter_data_queue_entries, filter_data_special_tasks = ( 1594 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1595 filter_data_common, start_time, end_time)) 1596 1597 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1598 + get_host_num_special_tasks(**filter_data_special_tasks)) 1599 1600 1601# recurring run 1602 1603def get_recurring(**filter_data): 1604 return rpc_utils.prepare_rows_as_nested_dicts( 1605 models.RecurringRun.query_objects(filter_data), 1606 ('job', 'owner')) 1607 1608 1609def get_num_recurring(**filter_data): 1610 return models.RecurringRun.query_count(filter_data) 1611 1612 1613def delete_recurring_runs(**filter_data): 1614 to_delete = models.RecurringRun.query_objects(filter_data) 1615 to_delete.delete() 1616 1617 1618def create_recurring_run(job_id, start_date, loop_period, loop_count): 1619 owner = models.User.current_user().login 1620 job = models.Job.objects.get(id=job_id) 1621 return job.create_recurring_job(start_date=start_date, 1622 loop_period=loop_period, 1623 loop_count=loop_count, 1624 owner=owner) 1625 1626 1627# other 1628 1629def echo(data=""): 1630 """\ 1631 Returns a passed in string. For doing a basic test to see if RPC calls 1632 can successfully be made. 1633 """ 1634 return data 1635 1636 1637def get_motd(): 1638 """\ 1639 Returns the message of the day as a string. 1640 """ 1641 return rpc_utils.get_motd() 1642 1643 1644def get_static_data(): 1645 """\ 1646 Returns a dictionary containing a bunch of data that shouldn't change 1647 often and is otherwise inaccessible. This includes: 1648 1649 priorities: List of job priority choices. 1650 default_priority: Default priority value for new jobs. 1651 users: Sorted list of all users. 1652 labels: Sorted list of labels not start with 'cros-version' and 1653 'fw-version'. 1654 atomic_groups: Sorted list of all atomic groups. 1655 tests: Sorted list of all tests. 1656 profilers: Sorted list of all profilers. 1657 current_user: Logged-in username. 1658 host_statuses: Sorted list of possible Host statuses. 1659 job_statuses: Sorted list of possible HostQueueEntry statuses. 1660 job_timeout_default: The default job timeout length in minutes. 1661 parse_failed_repair_default: Default value for the parse_failed_repair job 1662 option. 1663 reboot_before_options: A list of valid RebootBefore string enums. 1664 reboot_after_options: A list of valid RebootAfter string enums. 1665 motd: Server's message of the day. 1666 status_dictionary: A mapping from one word job status names to a more 1667 informative description. 1668 """ 1669 1670 job_fields = models.Job.get_field_dict() 1671 default_drone_set_name = models.DroneSet.default_drone_set_name() 1672 drone_sets = ([default_drone_set_name] + 1673 sorted(drone_set.name for drone_set in 1674 models.DroneSet.objects.exclude( 1675 name=default_drone_set_name))) 1676 1677 result = {} 1678 result['priorities'] = priorities.Priority.choices() 1679 default_priority = priorities.Priority.DEFAULT 1680 result['default_priority'] = 'Default' 1681 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1682 result['users'] = get_users(sort_by=['login']) 1683 1684 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1685 {'name__startswith': 'fw-version'}, 1686 {'name__startswith': 'fwrw-version'}, 1687 {'name__startswith': 'fwro-version'}, 1688 {'name__startswith': 'ab-version'}, 1689 {'name__startswith': 'testbed-version'}] 1690 result['labels'] = get_labels( 1691 label_exclude_filters, 1692 sort_by=['-platform', 'name']) 1693 1694 result['atomic_groups'] = get_atomic_groups(sort_by=['name']) 1695 result['tests'] = get_tests(sort_by=['name']) 1696 result['profilers'] = get_profilers(sort_by=['name']) 1697 result['current_user'] = rpc_utils.prepare_for_serialization( 1698 models.User.current_user().get_object_dict()) 1699 result['host_statuses'] = sorted(models.Host.Status.names) 1700 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1701 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1702 result['job_max_runtime_mins_default'] = ( 1703 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1704 result['parse_failed_repair_default'] = bool( 1705 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1706 result['reboot_before_options'] = model_attributes.RebootBefore.names 1707 result['reboot_after_options'] = model_attributes.RebootAfter.names 1708 result['motd'] = rpc_utils.get_motd() 1709 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1710 result['drone_sets'] = drone_sets 1711 result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled() 1712 1713 result['status_dictionary'] = {"Aborted": "Aborted", 1714 "Verifying": "Verifying Host", 1715 "Provisioning": "Provisioning Host", 1716 "Pending": "Waiting on other hosts", 1717 "Running": "Running autoserv", 1718 "Completed": "Autoserv completed", 1719 "Failed": "Failed to complete", 1720 "Queued": "Queued", 1721 "Starting": "Next in host's queue", 1722 "Stopped": "Other host(s) failed verify", 1723 "Parsing": "Awaiting parse of final results", 1724 "Gathering": "Gathering log files", 1725 "Template": "Template job for recurring run", 1726 "Waiting": "Waiting for scheduler action", 1727 "Archiving": "Archiving results", 1728 "Resetting": "Resetting hosts"} 1729 1730 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1731 result['is_moblab'] = bool(utils.is_moblab()) 1732 1733 return result 1734 1735 1736def get_server_time(): 1737 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1738 1739 1740def get_hosts_by_attribute(attribute, value): 1741 """ 1742 Get the list of valid hosts that share the same host attribute value. 1743 1744 @param attribute: String of the host attribute to check. 1745 @param value: String of the value that is shared between hosts. 1746 1747 @returns List of hostnames that all have the same host attribute and 1748 value. 1749 """ 1750 hosts = models.HostAttribute.query_objects({'attribute': attribute, 1751 'value': value}) 1752 return [row.host.hostname for row in hosts if row.host.invalid == 0] 1753