rpc_interface.py revision 54a5b675ec32d9fe5ba5593fc9ef8ceb055c0a61
1# pylint: disable-msg=C0111 2 3"""\ 4Functions to expose over the RPC interface. 5 6For all modify* and delete* functions that ask for an 'id' parameter to 7identify the object to operate on, the id may be either 8 * the database row ID 9 * the name of the object (label name, hostname, user login, etc.) 10 * a dictionary containing uniquely identifying field (this option should seldom 11 be used) 12 13When specifying foreign key fields (i.e. adding hosts to a label, or adding 14users to an ACL group), the given value may be either the database row ID or the 15name of the object. 16 17All get* functions return lists of dictionaries. Each dictionary represents one 18object and maps field names to values. 19 20Some examples: 21modify_host(2, hostname='myhost') # modify hostname of host with database ID 2 22modify_host('ipaj2', hostname='myhost') # modify hostname of host 'ipaj2' 23modify_test('sleeptest', test_type='Client', params=', seconds=60') 24delete_acl_group(1) # delete by ID 25delete_acl_group('Everyone') # delete by name 26acl_group_add_users('Everyone', ['mbligh', 'showard']) 27get_jobs(owner='showard', status='Queued') 28 29See doctests/001_rpc_test.txt for (lots) more examples. 30""" 31 32__author__ = 'showard@google.com (Steve Howard)' 33 34import sys 35import datetime 36import logging 37 38from django.db.models import Count 39import common 40from autotest_lib.client.common_lib import priorities 41from autotest_lib.client.common_lib.cros import dev_server 42from autotest_lib.client.common_lib.cros.graphite import autotest_stats 43from autotest_lib.frontend.afe import control_file, rpc_utils 44from autotest_lib.frontend.afe import models, model_logic, model_attributes 45from autotest_lib.frontend.afe import site_rpc_interface 46from autotest_lib.frontend.tko import models as tko_models 47from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface 48from autotest_lib.server import frontend 49from autotest_lib.server import utils 50from autotest_lib.server.cros import provision 51from autotest_lib.server.cros.dynamic_suite import tools 52from autotest_lib.site_utils import status_history 53 54 55_timer = autotest_stats.Timer('rpc_interface') 56 57def get_parameterized_autoupdate_image_url(job): 58 """Get the parameterized autoupdate image url from a parameterized job.""" 59 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 60 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 61 name='image') 62 para_set = job.parameterized_job.parameterizedjobparameter_set 63 job_test_para = para_set.get(test_parameter=image_parameter) 64 return job_test_para.parameter_value 65 66 67# labels 68 69def modify_label(id, **data): 70 """Modify a label. 71 72 @param id: id or name of a label. More often a label name. 73 @param data: New data for a label. 74 """ 75 label_model = models.Label.smart_get(id) 76 label_model.update_object(data) 77 78 # Master forwards the RPC to shards 79 if not utils.is_shard(): 80 rpc_utils.fanout_rpc(label_model.host_set.all(), 'modify_label', False, 81 id=id, **data) 82 83 84def delete_label(id): 85 """Delete a label. 86 87 @param id: id or name of a label. More often a label name. 88 """ 89 label_model = models.Label.smart_get(id) 90 # Hosts that have the label to be deleted. Save this info before 91 # the label is deleted to use it later. 92 hosts = [] 93 for h in label_model.host_set.all(): 94 hosts.append(models.Host.smart_get(h.id)) 95 label_model.delete() 96 97 # Master forwards the RPC to shards 98 if not utils.is_shard(): 99 rpc_utils.fanout_rpc(hosts, 'delete_label', False, id=id) 100 101 102def add_label(name, ignore_exception_if_exists=False, **kwargs): 103 """Adds a new label of a given name. 104 105 @param name: label name. 106 @param ignore_exception_if_exists: If True and the exception was 107 thrown due to the duplicated label name when adding a label, 108 then suppress the exception. Default is False. 109 @param kwargs: keyword args that store more info about a label 110 other than the name. 111 @return: int/long id of a new label. 112 """ 113 # models.Label.add_object() throws model_logic.ValidationError 114 # when it is given a label name that already exists. 115 # However, ValidationError can be thrown with different errors, 116 # and those errors should be thrown up to the call chain. 117 try: 118 label = models.Label.add_object(name=name, **kwargs) 119 except: 120 exc_info = sys.exc_info() 121 if ignore_exception_if_exists: 122 label = rpc_utils.get_label(name) 123 # If the exception is raised not because of duplicated 124 # "name", then raise the original exception. 125 if label is None: 126 raise exc_info[0], exc_info[1], exc_info[2] 127 else: 128 raise exc_info[0], exc_info[1], exc_info[2] 129 return label.id 130 131 132def add_label_to_hosts(id, hosts): 133 """Adds a label of the given id to the given hosts only in local DB. 134 135 @param id: id or name of a label. More often a label name. 136 @param hosts: The hostnames of hosts that need the label. 137 138 @raises models.Label.DoesNotExist: If the label with id doesn't exist. 139 """ 140 label = models.Label.smart_get(id) 141 host_objs = models.Host.smart_get_bulk(hosts) 142 if label.platform: 143 models.Host.check_no_platform(host_objs) 144 label.host_set.add(*host_objs) 145 146 147def _create_label_everywhere(id, hosts): 148 """ 149 Yet another method to create labels. 150 151 ALERT! This method should be run only on master not shards! 152 DO NOT RUN THIS ON A SHARD!!! Deputies will hate you if you do!!! 153 154 This method exists primarily to serve label_add_hosts() and 155 host_add_labels(). Basically it pulls out the label check/add logic 156 from label_add_hosts() into this nice method that not only creates 157 the label but also tells the shards that service the hosts to also 158 create the label. 159 160 @param id: id or name of a label. More often a label name. 161 @param hosts: A list of hostnames or ids. More often hostnames. 162 """ 163 try: 164 label = models.Label.smart_get(id) 165 except models.Label.DoesNotExist: 166 # This matches the type checks in smart_get, which is a hack 167 # in and off itself. The aim here is to create any non-existent 168 # label, which we cannot do if the 'id' specified isn't a label name. 169 if isinstance(id, basestring): 170 label = models.Label.smart_get(add_label(id)) 171 else: 172 raise ValueError('Label id (%s) does not exist. Please specify ' 173 'the argument, id, as a string (label name).' 174 % id) 175 176 # Make sure the label exists on the shard with the same id 177 # as it is on the master. 178 # It is possible that the label is already in a shard because 179 # we are adding a new label only to shards of hosts that the label 180 # is going to be attached. 181 # For example, we add a label L1 to a host in shard S1. 182 # Master and S1 will have L1 but other shards won't. 183 # Later, when we add the same label L1 to hosts in shards S1 and S2, 184 # S1 already has the label but S2 doesn't. 185 # S2 should have the new label without any problem. 186 # We ignore exception in such a case. 187 host_objs = models.Host.smart_get_bulk(hosts) 188 rpc_utils.fanout_rpc( 189 host_objs, 'add_label', include_hostnames=False, 190 name=label.name, ignore_exception_if_exists=True, 191 id=label.id, platform=label.platform) 192 193 194@rpc_utils.route_rpc_to_master 195def label_add_hosts(id, hosts): 196 """Adds a label with the given id to the given hosts. 197 198 This method should be run only on master not shards. 199 The given label will be created if it doesn't exist, provided the `id` 200 supplied is a label name not an int/long id. 201 202 @param id: id or name of a label. More often a label name. 203 @param hosts: A list of hostnames or ids. More often hostnames. 204 205 @raises ValueError: If the id specified is an int/long (label id) 206 while the label does not exist. 207 """ 208 # Create the label. 209 _create_label_everywhere(id, hosts) 210 211 # Add it to the master. 212 add_label_to_hosts(id, hosts) 213 214 # Add it to the shards. 215 host_objs = models.Host.smart_get_bulk(hosts) 216 rpc_utils.fanout_rpc(host_objs, 'add_label_to_hosts', id=id) 217 218 219def remove_label_from_hosts(id, hosts): 220 """Removes a label of the given id from the given hosts only in local DB. 221 222 @param id: id or name of a label. 223 @param hosts: The hostnames of hosts that need to remove the label from. 224 """ 225 host_objs = models.Host.smart_get_bulk(hosts) 226 models.Label.smart_get(id).host_set.remove(*host_objs) 227 228 229@rpc_utils.route_rpc_to_master 230def label_remove_hosts(id, hosts): 231 """Removes a label of the given id from the given hosts. 232 233 This method should be run only on master not shards. 234 235 @param id: id or name of a label. 236 @param hosts: A list of hostnames or ids. More often hostnames. 237 """ 238 host_objs = models.Host.smart_get_bulk(hosts) 239 remove_label_from_hosts(id, hosts) 240 241 rpc_utils.fanout_rpc(host_objs, 'remove_label_from_hosts', id=id) 242 243 244def get_labels(exclude_filters=(), **filter_data): 245 """\ 246 @param exclude_filters: A sequence of dictionaries of filters. 247 248 @returns A sequence of nested dictionaries of label information. 249 """ 250 labels = models.Label.query_objects(filter_data) 251 for exclude_filter in exclude_filters: 252 labels = labels.exclude(**exclude_filter) 253 return rpc_utils.prepare_rows_as_nested_dicts(labels, ('atomic_group',)) 254 255 256# atomic groups 257 258def add_atomic_group(name, max_number_of_machines=None, description=None): 259 return models.AtomicGroup.add_object( 260 name=name, max_number_of_machines=max_number_of_machines, 261 description=description).id 262 263 264def modify_atomic_group(id, **data): 265 models.AtomicGroup.smart_get(id).update_object(data) 266 267 268def delete_atomic_group(id): 269 models.AtomicGroup.smart_get(id).delete() 270 271 272def atomic_group_add_labels(id, labels): 273 label_objs = models.Label.smart_get_bulk(labels) 274 models.AtomicGroup.smart_get(id).label_set.add(*label_objs) 275 276 277def atomic_group_remove_labels(id, labels): 278 label_objs = models.Label.smart_get_bulk(labels) 279 models.AtomicGroup.smart_get(id).label_set.remove(*label_objs) 280 281 282def get_atomic_groups(**filter_data): 283 return rpc_utils.prepare_for_serialization( 284 models.AtomicGroup.list_objects(filter_data)) 285 286 287# hosts 288 289def add_host(hostname, status=None, locked=None, lock_reason='', protection=None): 290 if locked and not lock_reason: 291 raise model_logic.ValidationError( 292 {'locked': 'Please provide a reason for locking when adding host.'}) 293 294 return models.Host.add_object(hostname=hostname, status=status, 295 locked=locked, lock_reason=lock_reason, 296 protection=protection).id 297 298 299@rpc_utils.route_rpc_to_master 300def modify_host(id, **kwargs): 301 """Modify local attributes of a host. 302 303 If this is called on the master, but the host is assigned to a shard, this 304 will call `modify_host_local` RPC to the responsible shard. This means if 305 a host is being locked using this function, this change will also propagate 306 to shards. 307 When this is called on a shard, the shard just routes the RPC to the master 308 and does nothing. 309 310 @param id: id of the host to modify. 311 @param kwargs: key=value pairs of values to set on the host. 312 """ 313 rpc_utils.check_modify_host(kwargs) 314 host = models.Host.smart_get(id) 315 try: 316 rpc_utils.check_modify_host_locking(host, kwargs) 317 except model_logic.ValidationError as e: 318 if not kwargs.get('force_modify_locking', False): 319 raise 320 logging.exception('The following exception will be ignored and lock ' 321 'modification will be enforced. %s', e) 322 323 # This is required to make `lock_time` for a host be exactly same 324 # between the master and a shard. 325 if kwargs.get('locked', None) and 'lock_time' not in kwargs: 326 kwargs['lock_time'] = datetime.datetime.now() 327 host.update_object(kwargs) 328 329 # force_modifying_locking is not an internal field in database, remove. 330 kwargs.pop('force_modify_locking', None) 331 rpc_utils.fanout_rpc([host], 'modify_host_local', 332 include_hostnames=False, id=id, **kwargs) 333 334 335def modify_host_local(id, **kwargs): 336 """Modify host attributes in local DB. 337 338 @param id: Host id. 339 @param kwargs: key=value pairs of values to set on the host. 340 """ 341 models.Host.smart_get(id).update_object(kwargs) 342 343 344@rpc_utils.route_rpc_to_master 345def modify_hosts(host_filter_data, update_data): 346 """Modify local attributes of multiple hosts. 347 348 If this is called on the master, but one of the hosts in that match the 349 filters is assigned to a shard, this will call `modify_hosts_local` RPC 350 to the responsible shard. 351 When this is called on a shard, the shard just routes the RPC to the master 352 and does nothing. 353 354 The filters are always applied on the master, not on the shards. This means 355 if the states of a host differ on the master and a shard, the state on the 356 master will be used. I.e. this means: 357 A host was synced to Shard 1. On Shard 1 the status of the host was set to 358 'Repair Failed'. 359 - A call to modify_hosts with host_filter_data={'status': 'Ready'} will 360 update the host (both on the shard and on the master), because the state 361 of the host as the master knows it is still 'Ready'. 362 - A call to modify_hosts with host_filter_data={'status': 'Repair failed' 363 will not update the host, because the filter doesn't apply on the master. 364 365 @param host_filter_data: Filters out which hosts to modify. 366 @param update_data: A dictionary with the changes to make to the hosts. 367 """ 368 update_data = update_data.copy() 369 rpc_utils.check_modify_host(update_data) 370 hosts = models.Host.query_objects(host_filter_data) 371 372 affected_shard_hostnames = set() 373 affected_host_ids = [] 374 375 # Check all hosts before changing data for exception safety. 376 for host in hosts: 377 try: 378 rpc_utils.check_modify_host_locking(host, update_data) 379 except model_logic.ValidationError as e: 380 if not update_data.get('force_modify_locking', False): 381 raise 382 logging.exception('The following exception will be ignored and ' 383 'lock modification will be enforced. %s', e) 384 385 if host.shard: 386 affected_shard_hostnames.add(host.shard.rpc_hostname()) 387 affected_host_ids.append(host.id) 388 389 # This is required to make `lock_time` for a host be exactly same 390 # between the master and a shard. 391 if update_data.get('locked', None) and 'lock_time' not in update_data: 392 update_data['lock_time'] = datetime.datetime.now() 393 for host in hosts: 394 host.update_object(update_data) 395 396 update_data.pop('force_modify_locking', None) 397 # Caution: Changing the filter from the original here. See docstring. 398 rpc_utils.run_rpc_on_multiple_hostnames( 399 'modify_hosts_local', affected_shard_hostnames, 400 host_filter_data={'id__in': affected_host_ids}, 401 update_data=update_data) 402 403 404def modify_hosts_local(host_filter_data, update_data): 405 """Modify attributes of hosts in local DB. 406 407 @param host_filter_data: Filters out which hosts to modify. 408 @param update_data: A dictionary with the changes to make to the hosts. 409 """ 410 for host in models.Host.query_objects(host_filter_data): 411 host.update_object(update_data) 412 413 414def add_labels_to_host(id, labels): 415 """Adds labels to a given host only in local DB. 416 417 @param id: id or hostname for a host. 418 @param labels: ids or names for labels. 419 """ 420 label_objs = models.Label.smart_get_bulk(labels) 421 models.Host.smart_get(id).labels.add(*label_objs) 422 423 424@rpc_utils.route_rpc_to_master 425def host_add_labels(id, labels): 426 """Adds labels to a given host. 427 428 @param id: id or hostname for a host. 429 @param labels: ids or names for labels. 430 431 @raises ValidationError: If adding more than one platform label. 432 """ 433 # Create the labels on the master/shards. 434 for label in labels: 435 _create_label_everywhere(label, [id]) 436 437 label_objs = models.Label.smart_get_bulk(labels) 438 platforms = [label.name for label in label_objs if label.platform] 439 if len(platforms) > 1: 440 raise model_logic.ValidationError( 441 {'labels': 'Adding more than one platform label: %s' % 442 ', '.join(platforms)}) 443 444 host_obj = models.Host.smart_get(id) 445 if len(platforms) == 1: 446 models.Host.check_no_platform([host_obj]) 447 add_labels_to_host(id, labels) 448 449 rpc_utils.fanout_rpc([host_obj], 'add_labels_to_host', False, 450 id=id, labels=labels) 451 452 453def remove_labels_from_host(id, labels): 454 """Removes labels from a given host only in local DB. 455 456 @param id: id or hostname for a host. 457 @param labels: ids or names for labels. 458 """ 459 label_objs = models.Label.smart_get_bulk(labels) 460 models.Host.smart_get(id).labels.remove(*label_objs) 461 462 463@rpc_utils.route_rpc_to_master 464def host_remove_labels(id, labels): 465 """Removes labels from a given host. 466 467 @param id: id or hostname for a host. 468 @param labels: ids or names for labels. 469 """ 470 remove_labels_from_host(id, labels) 471 472 host_obj = models.Host.smart_get(id) 473 rpc_utils.fanout_rpc([host_obj], 'remove_labels_from_host', False, 474 id=id, labels=labels) 475 476 477def get_host_attribute(attribute, **host_filter_data): 478 """ 479 @param attribute: string name of attribute 480 @param host_filter_data: filter data to apply to Hosts to choose hosts to 481 act upon 482 """ 483 hosts = rpc_utils.get_host_query((), False, False, True, host_filter_data) 484 hosts = list(hosts) 485 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 486 'attribute_list') 487 host_attr_dicts = [] 488 for host_obj in hosts: 489 for attr_obj in host_obj.attribute_list: 490 if attr_obj.attribute == attribute: 491 host_attr_dicts.append(attr_obj.get_object_dict()) 492 return rpc_utils.prepare_for_serialization(host_attr_dicts) 493 494 495def set_host_attribute(attribute, value, **host_filter_data): 496 """ 497 @param attribute: string name of attribute 498 @param value: string, or None to delete an attribute 499 @param host_filter_data: filter data to apply to Hosts to choose hosts to 500 act upon 501 """ 502 assert host_filter_data # disallow accidental actions on all hosts 503 hosts = models.Host.query_objects(host_filter_data) 504 models.AclGroup.check_for_acl_violation_hosts(hosts) 505 for host in hosts: 506 host.set_or_delete_attribute(attribute, value) 507 508 # Master forwards this RPC to shards. 509 if not utils.is_shard(): 510 rpc_utils.fanout_rpc(hosts, 'set_host_attribute', False, 511 attribute=attribute, value=value, **host_filter_data) 512 513 514@rpc_utils.forward_single_host_rpc_to_shard 515def delete_host(id): 516 models.Host.smart_get(id).delete() 517 518 519def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 520 exclude_atomic_group_hosts=False, valid_only=True, 521 include_current_job=False, **filter_data): 522 """Get a list of dictionaries which contains the information of hosts. 523 524 @param multiple_labels: match hosts in all of the labels given. Should 525 be a list of label names. 526 @param exclude_only_if_needed_labels: Exclude hosts with at least one 527 "only_if_needed" label applied. 528 @param exclude_atomic_group_hosts: Exclude hosts that have one or more 529 atomic group labels associated with them. 530 @param include_current_job: Set to True to include ids of currently running 531 job and special task. 532 """ 533 hosts = rpc_utils.get_host_query(multiple_labels, 534 exclude_only_if_needed_labels, 535 exclude_atomic_group_hosts, 536 valid_only, filter_data) 537 hosts = list(hosts) 538 models.Host.objects.populate_relationships(hosts, models.Label, 539 'label_list') 540 models.Host.objects.populate_relationships(hosts, models.AclGroup, 541 'acl_list') 542 models.Host.objects.populate_relationships(hosts, models.HostAttribute, 543 'attribute_list') 544 host_dicts = [] 545 for host_obj in hosts: 546 host_dict = host_obj.get_object_dict() 547 host_dict['labels'] = [label.name for label in host_obj.label_list] 548 host_dict['platform'], host_dict['atomic_group'] = (rpc_utils. 549 find_platform_and_atomic_group(host_obj)) 550 host_dict['acls'] = [acl.name for acl in host_obj.acl_list] 551 host_dict['attributes'] = dict((attribute.attribute, attribute.value) 552 for attribute in host_obj.attribute_list) 553 if include_current_job: 554 host_dict['current_job'] = None 555 host_dict['current_special_task'] = None 556 entries = models.HostQueueEntry.objects.filter( 557 host_id=host_dict['id'], active=True, complete=False) 558 if entries: 559 host_dict['current_job'] = ( 560 entries[0].get_object_dict()['job']) 561 tasks = models.SpecialTask.objects.filter( 562 host_id=host_dict['id'], is_active=True, is_complete=False) 563 if tasks: 564 host_dict['current_special_task'] = ( 565 '%d-%s' % (tasks[0].get_object_dict()['id'], 566 tasks[0].get_object_dict()['task'].lower())) 567 host_dicts.append(host_dict) 568 return rpc_utils.prepare_for_serialization(host_dicts) 569 570 571def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False, 572 exclude_atomic_group_hosts=False, valid_only=True, 573 **filter_data): 574 """ 575 Same parameters as get_hosts(). 576 577 @returns The number of matching hosts. 578 """ 579 hosts = rpc_utils.get_host_query(multiple_labels, 580 exclude_only_if_needed_labels, 581 exclude_atomic_group_hosts, 582 valid_only, filter_data) 583 return hosts.count() 584 585 586# tests 587 588def add_test(name, test_type, path, author=None, dependencies=None, 589 experimental=True, run_verify=None, test_class=None, 590 test_time=None, test_category=None, description=None, 591 sync_count=1): 592 return models.Test.add_object(name=name, test_type=test_type, path=path, 593 author=author, dependencies=dependencies, 594 experimental=experimental, 595 run_verify=run_verify, test_time=test_time, 596 test_category=test_category, 597 sync_count=sync_count, 598 test_class=test_class, 599 description=description).id 600 601 602def modify_test(id, **data): 603 models.Test.smart_get(id).update_object(data) 604 605 606def delete_test(id): 607 models.Test.smart_get(id).delete() 608 609 610def get_tests(**filter_data): 611 return rpc_utils.prepare_for_serialization( 612 models.Test.list_objects(filter_data)) 613 614 615@_timer.decorate 616def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name): 617 """Gets the counts of all passed and failed tests from the matching jobs. 618 619 @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g., 620 'butterfly-release/R40-6457.21.0/bvt-cq/'. 621 @param label_name: Label that must be set in the jobs, e.g., 622 'cros-version:butterfly-release/R40-6457.21.0'. 623 624 @returns A summary of the counts of all the passed and failed tests. 625 """ 626 job_ids = list(models.Job.objects.filter( 627 name__startswith=job_name_prefix, 628 dependency_labels__name=label_name).values_list( 629 'pk', flat=True)) 630 summary = {'passed': 0, 'failed': 0} 631 if not job_ids: 632 return summary 633 634 counts = (tko_models.TestView.objects.filter( 635 afe_job_id__in=job_ids).exclude( 636 test_name='SERVER_JOB').exclude( 637 test_name__startswith='CLIENT_JOB').values( 638 'status').annotate( 639 count=Count('status'))) 640 for status in counts: 641 if status['status'] == 'GOOD': 642 summary['passed'] += status['count'] 643 else: 644 summary['failed'] += status['count'] 645 return summary 646 647 648# profilers 649 650def add_profiler(name, description=None): 651 return models.Profiler.add_object(name=name, description=description).id 652 653 654def modify_profiler(id, **data): 655 models.Profiler.smart_get(id).update_object(data) 656 657 658def delete_profiler(id): 659 models.Profiler.smart_get(id).delete() 660 661 662def get_profilers(**filter_data): 663 return rpc_utils.prepare_for_serialization( 664 models.Profiler.list_objects(filter_data)) 665 666 667# users 668 669def add_user(login, access_level=None): 670 return models.User.add_object(login=login, access_level=access_level).id 671 672 673def modify_user(id, **data): 674 models.User.smart_get(id).update_object(data) 675 676 677def delete_user(id): 678 models.User.smart_get(id).delete() 679 680 681def get_users(**filter_data): 682 return rpc_utils.prepare_for_serialization( 683 models.User.list_objects(filter_data)) 684 685 686# acl groups 687 688def add_acl_group(name, description=None): 689 group = models.AclGroup.add_object(name=name, description=description) 690 group.users.add(models.User.current_user()) 691 return group.id 692 693 694def modify_acl_group(id, **data): 695 group = models.AclGroup.smart_get(id) 696 group.check_for_acl_violation_acl_group() 697 group.update_object(data) 698 group.add_current_user_if_empty() 699 700 701def acl_group_add_users(id, users): 702 group = models.AclGroup.smart_get(id) 703 group.check_for_acl_violation_acl_group() 704 users = models.User.smart_get_bulk(users) 705 group.users.add(*users) 706 707 708def acl_group_remove_users(id, users): 709 group = models.AclGroup.smart_get(id) 710 group.check_for_acl_violation_acl_group() 711 users = models.User.smart_get_bulk(users) 712 group.users.remove(*users) 713 group.add_current_user_if_empty() 714 715 716def acl_group_add_hosts(id, hosts): 717 group = models.AclGroup.smart_get(id) 718 group.check_for_acl_violation_acl_group() 719 hosts = models.Host.smart_get_bulk(hosts) 720 group.hosts.add(*hosts) 721 group.on_host_membership_change() 722 723 724def acl_group_remove_hosts(id, hosts): 725 group = models.AclGroup.smart_get(id) 726 group.check_for_acl_violation_acl_group() 727 hosts = models.Host.smart_get_bulk(hosts) 728 group.hosts.remove(*hosts) 729 group.on_host_membership_change() 730 731 732def delete_acl_group(id): 733 models.AclGroup.smart_get(id).delete() 734 735 736def get_acl_groups(**filter_data): 737 acl_groups = models.AclGroup.list_objects(filter_data) 738 for acl_group in acl_groups: 739 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id']) 740 acl_group['users'] = [user.login 741 for user in acl_group_obj.users.all()] 742 acl_group['hosts'] = [host.hostname 743 for host in acl_group_obj.hosts.all()] 744 return rpc_utils.prepare_for_serialization(acl_groups) 745 746 747# jobs 748 749def generate_control_file(tests=(), kernel=None, label=None, profilers=(), 750 client_control_file='', use_container=False, 751 profile_only=None, upload_kernel_config=False, 752 db_tests=True): 753 """ 754 Generates a client-side control file to load a kernel and run tests. 755 756 @param tests List of tests to run. See db_tests for more information. 757 @param kernel A list of kernel info dictionaries configuring which kernels 758 to boot for this job and other options for them 759 @param label Name of label to grab kernel config from. 760 @param profilers List of profilers to activate during the job. 761 @param client_control_file The contents of a client-side control file to 762 run at the end of all tests. If this is supplied, all tests must be 763 client side. 764 TODO: in the future we should support server control files directly 765 to wrap with a kernel. That'll require changing the parameter 766 name and adding a boolean to indicate if it is a client or server 767 control file. 768 @param use_container unused argument today. TODO: Enable containers 769 on the host during a client side test. 770 @param profile_only A boolean that indicates what default profile_only 771 mode to use in the control file. Passing None will generate a 772 control file that does not explcitly set the default mode at all. 773 @param upload_kernel_config: if enabled it will generate server control 774 file code that uploads the kernel config file to the client and 775 tells the client of the new (local) path when compiling the kernel; 776 the tests must be server side tests 777 @param db_tests: if True, the test object can be found in the database 778 backing the test model. In this case, tests is a tuple 779 of test IDs which are used to retrieve the test objects 780 from the database. If False, tests is a tuple of test 781 dictionaries stored client-side in the AFE. 782 783 @returns a dict with the following keys: 784 control_file: str, The control file text. 785 is_server: bool, is the control file a server-side control file? 786 synch_count: How many machines the job uses per autoserv execution. 787 synch_count == 1 means the job is asynchronous. 788 dependencies: A list of the names of labels on which the job depends. 789 """ 790 if not tests and not client_control_file: 791 return dict(control_file='', is_server=False, synch_count=1, 792 dependencies=[]) 793 794 cf_info, test_objects, profiler_objects, label = ( 795 rpc_utils.prepare_generate_control_file(tests, kernel, label, 796 profilers, db_tests)) 797 cf_info['control_file'] = control_file.generate_control( 798 tests=test_objects, kernels=kernel, platform=label, 799 profilers=profiler_objects, is_server=cf_info['is_server'], 800 client_control_file=client_control_file, profile_only=profile_only, 801 upload_kernel_config=upload_kernel_config) 802 return cf_info 803 804 805def create_parameterized_job(name, priority, test, parameters, kernel=None, 806 label=None, profilers=(), profiler_parameters=None, 807 use_container=False, profile_only=None, 808 upload_kernel_config=False, hosts=(), 809 meta_hosts=(), one_time_hosts=(), 810 atomic_group_name=None, synch_count=None, 811 is_template=False, timeout=None, 812 timeout_mins=None, max_runtime_mins=None, 813 run_verify=False, email_list='', dependencies=(), 814 reboot_before=None, reboot_after=None, 815 parse_failed_repair=None, hostless=False, 816 keyvals=None, drone_set=None, run_reset=True, 817 require_ssp=None): 818 """ 819 Creates and enqueues a parameterized job. 820 821 Most parameters a combination of the parameters for generate_control_file() 822 and create_job(), with the exception of: 823 824 @param test name or ID of the test to run 825 @param parameters a map of parameter name -> 826 tuple of (param value, param type) 827 @param profiler_parameters a dictionary of parameters for the profilers: 828 key: profiler name 829 value: dict of param name -> tuple of 830 (param value, 831 param type) 832 """ 833 # Save the values of the passed arguments here. What we're going to do with 834 # them is pass them all to rpc_utils.get_create_job_common_args(), which 835 # will extract the subset of these arguments that apply for 836 # rpc_utils.create_job_common(), which we then pass in to that function. 837 args = locals() 838 839 # Set up the parameterized job configs 840 test_obj = models.Test.smart_get(test) 841 control_type = test_obj.test_type 842 843 try: 844 label = models.Label.smart_get(label) 845 except models.Label.DoesNotExist: 846 label = None 847 848 kernel_objs = models.Kernel.create_kernels(kernel) 849 profiler_objs = [models.Profiler.smart_get(profiler) 850 for profiler in profilers] 851 852 parameterized_job = models.ParameterizedJob.objects.create( 853 test=test_obj, label=label, use_container=use_container, 854 profile_only=profile_only, 855 upload_kernel_config=upload_kernel_config) 856 parameterized_job.kernels.add(*kernel_objs) 857 858 for profiler in profiler_objs: 859 parameterized_profiler = models.ParameterizedJobProfiler.objects.create( 860 parameterized_job=parameterized_job, 861 profiler=profiler) 862 profiler_params = profiler_parameters.get(profiler.name, {}) 863 for name, (value, param_type) in profiler_params.iteritems(): 864 models.ParameterizedJobProfilerParameter.objects.create( 865 parameterized_job_profiler=parameterized_profiler, 866 parameter_name=name, 867 parameter_value=value, 868 parameter_type=param_type) 869 870 try: 871 for parameter in test_obj.testparameter_set.all(): 872 if parameter.name in parameters: 873 param_value, param_type = parameters.pop(parameter.name) 874 parameterized_job.parameterizedjobparameter_set.create( 875 test_parameter=parameter, parameter_value=param_value, 876 parameter_type=param_type) 877 878 if parameters: 879 raise Exception('Extra parameters remain: %r' % parameters) 880 881 return rpc_utils.create_job_common( 882 parameterized_job=parameterized_job.id, 883 control_type=control_type, 884 **rpc_utils.get_create_job_common_args(args)) 885 except: 886 parameterized_job.delete() 887 raise 888 889 890def create_job_page_handler(name, priority, control_file, control_type, 891 image=None, hostless=False, firmware_rw_build=None, 892 firmware_ro_build=None, test_source_build=None, 893 **kwargs): 894 """\ 895 Create and enqueue a job. 896 897 @param name name of this job 898 @param priority Integer priority of this job. Higher is more important. 899 @param control_file String contents of the control file. 900 @param control_type Type of control file, Client or Server. 901 @param image: ChromeOS build to be installed in the dut. Default to None. 902 @param firmware_rw_build: Firmware build to update RW firmware. Default to 903 None, i.e., RW firmware will not be updated. 904 @param firmware_ro_build: Firmware build to update RO firmware. Default to 905 None, i.e., RO firmware will not be updated. 906 @param test_source_build: Build to be used to retrieve test code. Default 907 to None. 908 @param kwargs extra args that will be required by create_suite_job or 909 create_job. 910 911 @returns The created Job id number. 912 """ 913 control_file = rpc_utils.encode_ascii(control_file) 914 if not control_file: 915 raise model_logic.ValidationError({ 916 'control_file' : "Control file cannot be empty"}) 917 918 if image and hostless: 919 builds = {} 920 builds[provision.CROS_VERSION_PREFIX] = image 921 if firmware_rw_build: 922 builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build 923 if firmware_ro_build: 924 builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build 925 return site_rpc_interface.create_suite_job( 926 name=name, control_file=control_file, priority=priority, 927 builds=builds, test_source_build=test_source_build, **kwargs) 928 return create_job(name, priority, control_file, control_type, image=image, 929 hostless=hostless, **kwargs) 930 931 932@rpc_utils.route_rpc_to_master 933def create_job(name, priority, control_file, control_type, 934 hosts=(), meta_hosts=(), one_time_hosts=(), 935 atomic_group_name=None, synch_count=None, is_template=False, 936 timeout=None, timeout_mins=None, max_runtime_mins=None, 937 run_verify=False, email_list='', dependencies=(), 938 reboot_before=None, reboot_after=None, parse_failed_repair=None, 939 hostless=False, keyvals=None, drone_set=None, image=None, 940 parent_job_id=None, test_retry=0, run_reset=True, 941 require_ssp=None, args=(), **kwargs): 942 """\ 943 Create and enqueue a job. 944 945 @param name name of this job 946 @param priority Integer priority of this job. Higher is more important. 947 @param control_file String contents of the control file. 948 @param control_type Type of control file, Client or Server. 949 @param synch_count How many machines the job uses per autoserv execution. 950 synch_count == 1 means the job is asynchronous. If an atomic group is 951 given this value is treated as a minimum. 952 @param is_template If true then create a template job. 953 @param timeout Hours after this call returns until the job times out. 954 @param timeout_mins Minutes after this call returns until the job times 955 out. 956 @param max_runtime_mins Minutes from job starting time until job times out 957 @param run_verify Should the host be verified before running the test? 958 @param email_list String containing emails to mail when the job is done 959 @param dependencies List of label names on which this job depends 960 @param reboot_before Never, If dirty, or Always 961 @param reboot_after Never, If all tests passed, or Always 962 @param parse_failed_repair if true, results of failed repairs launched by 963 this job will be parsed as part of the job. 964 @param hostless if true, create a hostless job 965 @param keyvals dict of keyvals to associate with the job 966 @param hosts List of hosts to run job on. 967 @param meta_hosts List where each entry is a label name, and for each entry 968 one host will be chosen from that label to run the job on. 969 @param one_time_hosts List of hosts not in the database to run the job on. 970 @param atomic_group_name The name of an atomic group to schedule the job on. 971 @param drone_set The name of the drone set to run this test on. 972 @param image OS image to install before running job. 973 @param parent_job_id id of a job considered to be parent of created job. 974 @param test_retry Number of times to retry test if the test did not 975 complete successfully. (optional, default: 0) 976 @param run_reset Should the host be reset before running the test? 977 @param require_ssp Set to True to require server-side packaging to run the 978 test. If it's set to None, drone will still try to run 979 the server side with server-side packaging. If the 980 autotest-server package doesn't exist for the build or 981 image is not set, drone will run the test without server- 982 side packaging. Default is None. 983 @param args A list of args to be injected into control file. 984 @param kwargs extra keyword args. NOT USED. 985 986 @returns The created Job id number. 987 """ 988 if args: 989 control_file = tools.inject_vars({'args': args}, control_file) 990 991 if image is None: 992 return rpc_utils.create_job_common( 993 **rpc_utils.get_create_job_common_args(locals())) 994 995 # Translate the image name, in case its a relative build name. 996 ds = dev_server.ImageServer.resolve(image) 997 image = ds.translate(image) 998 999 # When image is supplied use a known parameterized test already in the 1000 # database to pass the OS image path from the front end, through the 1001 # scheduler, and finally to autoserv as the --image parameter. 1002 1003 # The test autoupdate_ParameterizedJob is in afe_autotests and used to 1004 # instantiate a Test object and from there a ParameterizedJob. 1005 known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob') 1006 known_parameterized_job = models.ParameterizedJob.objects.create( 1007 test=known_test_obj) 1008 1009 # autoupdate_ParameterizedJob has a single parameter, the image parameter, 1010 # stored in the table afe_test_parameters. We retrieve and set this 1011 # instance of the parameter to the OS image path. 1012 image_parameter = known_test_obj.testparameter_set.get(test=known_test_obj, 1013 name='image') 1014 known_parameterized_job.parameterizedjobparameter_set.create( 1015 test_parameter=image_parameter, parameter_value=image, 1016 parameter_type='string') 1017 1018 # TODO(crbug.com/502638): save firmware build etc to parameterized_job. 1019 1020 # By passing a parameterized_job to create_job_common the job entry in 1021 # the afe_jobs table will have the field parameterized_job_id set. 1022 # The scheduler uses this id in the afe_parameterized_jobs table to 1023 # match this job to our known test, and then with the 1024 # afe_parameterized_job_parameters table to get the actual image path. 1025 return rpc_utils.create_job_common( 1026 parameterized_job=known_parameterized_job.id, 1027 **rpc_utils.get_create_job_common_args(locals())) 1028 1029 1030def abort_host_queue_entries(**filter_data): 1031 """\ 1032 Abort a set of host queue entries. 1033 1034 @return: A list of dictionaries, each contains information 1035 about an aborted HQE. 1036 """ 1037 query = models.HostQueueEntry.query_objects(filter_data) 1038 1039 # Dont allow aborts on: 1040 # 1. Jobs that have already completed (whether or not they were aborted) 1041 # 2. Jobs that we have already been aborted (but may not have completed) 1042 query = query.filter(complete=False).filter(aborted=False) 1043 models.AclGroup.check_abort_permissions(query) 1044 host_queue_entries = list(query.select_related()) 1045 rpc_utils.check_abort_synchronous_jobs(host_queue_entries) 1046 1047 models.HostQueueEntry.abort_host_queue_entries(host_queue_entries) 1048 hqe_info = [{'HostQueueEntry': hqe.id, 'Job': hqe.job_id, 1049 'Job name': hqe.job.name} for hqe in host_queue_entries] 1050 return hqe_info 1051 1052 1053def abort_special_tasks(**filter_data): 1054 """\ 1055 Abort the special task, or tasks, specified in the filter. 1056 """ 1057 query = models.SpecialTask.query_objects(filter_data) 1058 special_tasks = query.filter(is_active=True) 1059 for task in special_tasks: 1060 task.abort() 1061 1062 1063def _call_special_tasks_on_hosts(task, hosts): 1064 """\ 1065 Schedules a set of hosts for a special task. 1066 1067 @returns A list of hostnames that a special task was created for. 1068 """ 1069 models.AclGroup.check_for_acl_violation_hosts(hosts) 1070 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts) 1071 if shard_host_map and not utils.is_shard(): 1072 raise ValueError('The following hosts are on shards, please ' 1073 'follow the link to the shards and create jobs ' 1074 'there instead. %s.' % shard_host_map) 1075 for host in hosts: 1076 models.SpecialTask.schedule_special_task(host, task) 1077 return list(sorted(host.hostname for host in hosts)) 1078 1079 1080def _forward_special_tasks_on_hosts(task, rpc, **filter_data): 1081 """Forward special tasks to corresponding shards. 1082 1083 For master, when special tasks are fired on hosts that are sharded, 1084 forward the RPC to corresponding shards. 1085 1086 For shard, create special task records in local DB. 1087 1088 @param task: Enum value of frontend.afe.models.SpecialTask.Task 1089 @param rpc: RPC name to forward. 1090 @param filter_data: Filter keywords to be used for DB query. 1091 1092 @return: A list of hostnames that a special task was created for. 1093 """ 1094 hosts = models.Host.query_objects(filter_data) 1095 shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True) 1096 1097 # Filter out hosts on a shard from those on the master, forward 1098 # rpcs to the shard with an additional hostname__in filter, and 1099 # create a local SpecialTask for each remaining host. 1100 if shard_host_map and not utils.is_shard(): 1101 hosts = [h for h in hosts if h.shard is None] 1102 for shard, hostnames in shard_host_map.iteritems(): 1103 1104 # The main client of this module is the frontend website, and 1105 # it invokes it with an 'id' or an 'id__in' filter. Regardless, 1106 # the 'hostname' filter should narrow down the list of hosts on 1107 # each shard even though we supply all the ids in filter_data. 1108 # This method uses hostname instead of id because it fits better 1109 # with the overall architecture of redirection functions in 1110 # rpc_utils. 1111 shard_filter = filter_data.copy() 1112 shard_filter['hostname__in'] = hostnames 1113 rpc_utils.run_rpc_on_multiple_hostnames( 1114 rpc, [shard], **shard_filter) 1115 1116 # There is a race condition here if someone assigns a shard to one of these 1117 # hosts before we create the task. The host will stay on the master if: 1118 # 1. The host is not Ready 1119 # 2. The host is Ready but has a task 1120 # But if the host is Ready and doesn't have a task yet, it will get sent 1121 # to the shard as we're creating a task here. 1122 1123 # Given that we only rarely verify Ready hosts it isn't worth putting this 1124 # entire method in a transaction. The worst case scenario is that we have 1125 # a verify running on a Ready host while the shard is using it, if the 1126 # verify fails no subsequent tasks will be created against the host on the 1127 # master, and verifies are safe enough that this is OK. 1128 return _call_special_tasks_on_hosts(task, hosts) 1129 1130 1131def reverify_hosts(**filter_data): 1132 """\ 1133 Schedules a set of hosts for verify. 1134 1135 @returns A list of hostnames that a verify task was created for. 1136 """ 1137 return _forward_special_tasks_on_hosts( 1138 models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data) 1139 1140 1141def repair_hosts(**filter_data): 1142 """\ 1143 Schedules a set of hosts for repair. 1144 1145 @returns A list of hostnames that a repair task was created for. 1146 """ 1147 return _forward_special_tasks_on_hosts( 1148 models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data) 1149 1150 1151def get_jobs(not_yet_run=False, running=False, finished=False, 1152 suite=False, sub=False, standalone=False, **filter_data): 1153 """\ 1154 Extra status filter args for get_jobs: 1155 -not_yet_run: Include only jobs that have not yet started running. 1156 -running: Include only jobs that have start running but for which not 1157 all hosts have completed. 1158 -finished: Include only jobs for which all hosts have completed (or 1159 aborted). 1160 1161 Extra type filter args for get_jobs: 1162 -suite: Include only jobs with child jobs. 1163 -sub: Include only jobs with a parent job. 1164 -standalone: Inlcude only jobs with no child or parent jobs. 1165 At most one of these three fields should be specified. 1166 """ 1167 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1168 running, 1169 finished) 1170 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1171 suite, 1172 sub, 1173 standalone) 1174 job_dicts = [] 1175 jobs = list(models.Job.query_objects(filter_data)) 1176 models.Job.objects.populate_relationships(jobs, models.Label, 1177 'dependencies') 1178 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') 1179 for job in jobs: 1180 job_dict = job.get_object_dict() 1181 job_dict['dependencies'] = ','.join(label.name 1182 for label in job.dependencies) 1183 job_dict['keyvals'] = dict((keyval.key, keyval.value) 1184 for keyval in job.keyvals) 1185 if job.parameterized_job: 1186 job_dict['image'] = get_parameterized_autoupdate_image_url(job) 1187 job_dicts.append(job_dict) 1188 return rpc_utils.prepare_for_serialization(job_dicts) 1189 1190 1191def get_num_jobs(not_yet_run=False, running=False, finished=False, 1192 suite=False, sub=False, standalone=False, 1193 **filter_data): 1194 """\ 1195 See get_jobs() for documentation of extra filter parameters. 1196 """ 1197 extra_args = rpc_utils.extra_job_status_filters(not_yet_run, 1198 running, 1199 finished) 1200 filter_data['extra_args'] = rpc_utils.extra_job_type_filters(extra_args, 1201 suite, 1202 sub, 1203 standalone) 1204 return models.Job.query_count(filter_data) 1205 1206 1207def get_jobs_summary(**filter_data): 1208 """\ 1209 Like get_jobs(), but adds 'status_counts' and 'result_counts' field. 1210 1211 'status_counts' filed is a dictionary mapping status strings to the number 1212 of hosts currently with that status, i.e. {'Queued' : 4, 'Running' : 2}. 1213 1214 'result_counts' field is piped to tko's rpc_interface and has the return 1215 format specified under get_group_counts. 1216 """ 1217 jobs = get_jobs(**filter_data) 1218 ids = [job['id'] for job in jobs] 1219 all_status_counts = models.Job.objects.get_status_counts(ids) 1220 for job in jobs: 1221 job['status_counts'] = all_status_counts[job['id']] 1222 job['result_counts'] = tko_rpc_interface.get_status_counts( 1223 ['afe_job_id', 'afe_job_id'], 1224 header_groups=[['afe_job_id'], ['afe_job_id']], 1225 **{'afe_job_id': job['id']}) 1226 return rpc_utils.prepare_for_serialization(jobs) 1227 1228 1229def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None): 1230 """\ 1231 Retrieves all the information needed to clone a job. 1232 """ 1233 job = models.Job.objects.get(id=id) 1234 job_info = rpc_utils.get_job_info(job, 1235 preserve_metahosts, 1236 queue_entry_filter_data) 1237 1238 host_dicts = [] 1239 for host in job_info['hosts']: 1240 host_dict = get_hosts(id=host.id)[0] 1241 other_labels = host_dict['labels'] 1242 if host_dict['platform']: 1243 other_labels.remove(host_dict['platform']) 1244 host_dict['other_labels'] = ', '.join(other_labels) 1245 host_dicts.append(host_dict) 1246 1247 for host in job_info['one_time_hosts']: 1248 host_dict = dict(hostname=host.hostname, 1249 id=host.id, 1250 platform='(one-time host)', 1251 locked_text='') 1252 host_dicts.append(host_dict) 1253 1254 # convert keys from Label objects to strings (names of labels) 1255 meta_host_counts = dict((meta_host.name, count) for meta_host, count 1256 in job_info['meta_host_counts'].iteritems()) 1257 1258 info = dict(job=job.get_object_dict(), 1259 meta_host_counts=meta_host_counts, 1260 hosts=host_dicts) 1261 info['job']['dependencies'] = job_info['dependencies'] 1262 if job_info['atomic_group']: 1263 info['atomic_group_name'] = (job_info['atomic_group']).name 1264 else: 1265 info['atomic_group_name'] = None 1266 info['hostless'] = job_info['hostless'] 1267 info['drone_set'] = job.drone_set and job.drone_set.name 1268 1269 if job.parameterized_job: 1270 info['job']['image'] = get_parameterized_autoupdate_image_url(job) 1271 1272 return rpc_utils.prepare_for_serialization(info) 1273 1274 1275# host queue entries 1276 1277def get_host_queue_entries(start_time=None, end_time=None, **filter_data): 1278 """\ 1279 @returns A sequence of nested dictionaries of host and job information. 1280 """ 1281 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1282 'started_on__lte', 1283 start_time, 1284 end_time, 1285 **filter_data) 1286 return rpc_utils.prepare_rows_as_nested_dicts( 1287 models.HostQueueEntry.query_objects(filter_data), 1288 ('host', 'atomic_group', 'job')) 1289 1290 1291def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data): 1292 """\ 1293 Get the number of host queue entries associated with this job. 1294 """ 1295 filter_data = rpc_utils.inject_times_to_filter('started_on__gte', 1296 'started_on__lte', 1297 start_time, 1298 end_time, 1299 **filter_data) 1300 return models.HostQueueEntry.query_count(filter_data) 1301 1302 1303def get_hqe_percentage_complete(**filter_data): 1304 """ 1305 Computes the fraction of host queue entries matching the given filter data 1306 that are complete. 1307 """ 1308 query = models.HostQueueEntry.query_objects(filter_data) 1309 complete_count = query.filter(complete=True).count() 1310 total_count = query.count() 1311 if total_count == 0: 1312 return 1 1313 return float(complete_count) / total_count 1314 1315 1316# special tasks 1317 1318def get_special_tasks(**filter_data): 1319 """Get special task entries from the local database. 1320 1321 Query the special tasks table for tasks matching the given 1322 `filter_data`, and return a list of the results. No attempt is 1323 made to forward the call to shards; the buck will stop here. 1324 The caller is expected to know the target shard for such reasons 1325 as: 1326 * The caller is a service (such as gs_offloader) configured 1327 to operate on behalf of one specific shard, and no other. 1328 * The caller has a host as a parameter, and knows that this is 1329 the shard assigned to that host. 1330 1331 @param filter_data Filter keywords to pass to the underlying 1332 database query. 1333 1334 """ 1335 return rpc_utils.prepare_rows_as_nested_dicts( 1336 models.SpecialTask.query_objects(filter_data), 1337 ('host', 'queue_entry')) 1338 1339 1340def get_host_special_tasks(host_id, **filter_data): 1341 """Get special task entries for a given host. 1342 1343 Query the special tasks table for tasks that ran on the host 1344 given by `host_id` and matching the given `filter_data`. 1345 Return a list of the results. If the host is assigned to a 1346 shard, forward this call to that shard. 1347 1348 @param host_id Id in the database of the target host. 1349 @param filter_data Filter keywords to pass to the underlying 1350 database query. 1351 1352 """ 1353 # Retrieve host data even if the host is in an invalid state. 1354 host = models.Host.smart_get(host_id, False) 1355 if not host.shard: 1356 return get_special_tasks(host_id=host_id, **filter_data) 1357 else: 1358 # The return values from AFE methods are post-processed 1359 # objects that aren't JSON-serializable. So, we have to 1360 # call AFE.run() to get the raw, serializable output from 1361 # the shard. 1362 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1363 return shard_afe.run('get_special_tasks', 1364 host_id=host_id, **filter_data) 1365 1366 1367def get_num_special_tasks(**kwargs): 1368 """Get the number of special task entries from the local database. 1369 1370 Query the special tasks table for tasks matching the given 'kwargs', 1371 and return the number of the results. No attempt is made to forward 1372 the call to shards; the buck will stop here. 1373 1374 @param kwargs Filter keywords to pass to the underlying database query. 1375 1376 """ 1377 return models.SpecialTask.query_count(kwargs) 1378 1379 1380def get_host_num_special_tasks(host, **kwargs): 1381 """Get special task entries for a given host. 1382 1383 Query the special tasks table for tasks that ran on the host 1384 given by 'host' and matching the given 'kwargs'. 1385 Return a list of the results. If the host is assigned to a 1386 shard, forward this call to that shard. 1387 1388 @param host id or name of a host. More often a hostname. 1389 @param kwargs Filter keywords to pass to the underlying database query. 1390 1391 """ 1392 # Retrieve host data even if the host is in an invalid state. 1393 host_model = models.Host.smart_get(host, False) 1394 if not host_model.shard: 1395 return get_num_special_tasks(host=host, **kwargs) 1396 else: 1397 shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname()) 1398 return shard_afe.run('get_num_special_tasks', host=host, **kwargs) 1399 1400 1401def get_status_task(host_id, end_time): 1402 """Get the "status task" for a host from the local shard. 1403 1404 Returns a single special task representing the given host's 1405 "status task". The status task is a completed special task that 1406 identifies whether the corresponding host was working or broken 1407 when it completed. A successful task indicates a working host; 1408 a failed task indicates broken. 1409 1410 This call will not be forward to a shard; the receiving server 1411 must be the shard that owns the host. 1412 1413 @param host_id Id in the database of the target host. 1414 @param end_time Time reference for the host's status. 1415 1416 @return A single task; its status (successful or not) 1417 corresponds to the status of the host (working or 1418 broken) at the given time. If no task is found, return 1419 `None`. 1420 1421 """ 1422 tasklist = rpc_utils.prepare_rows_as_nested_dicts( 1423 status_history.get_status_task(host_id, end_time), 1424 ('host', 'queue_entry')) 1425 return tasklist[0] if tasklist else None 1426 1427 1428def get_host_status_task(host_id, end_time): 1429 """Get the "status task" for a host from its owning shard. 1430 1431 Finds the given host's owning shard, and forwards to it a call 1432 to `get_status_task()` (see above). 1433 1434 @param host_id Id in the database of the target host. 1435 @param end_time Time reference for the host's status. 1436 1437 @return A single task; its status (successful or not) 1438 corresponds to the status of the host (working or 1439 broken) at the given time. If no task is found, return 1440 `None`. 1441 1442 """ 1443 host = models.Host.smart_get(host_id) 1444 if not host.shard: 1445 return get_status_task(host_id, end_time) 1446 else: 1447 # The return values from AFE methods are post-processed 1448 # objects that aren't JSON-serializable. So, we have to 1449 # call AFE.run() to get the raw, serializable output from 1450 # the shard. 1451 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1452 return shard_afe.run('get_status_task', 1453 host_id=host_id, end_time=end_time) 1454 1455 1456def get_host_diagnosis_interval(host_id, end_time, success): 1457 """Find a "diagnosis interval" for a given host. 1458 1459 A "diagnosis interval" identifies a start and end time where 1460 the host went from "working" to "broken", or vice versa. The 1461 interval's starting time is the starting time of the last status 1462 task with the old status; the end time is the finish time of the 1463 first status task with the new status. 1464 1465 This routine finds the most recent diagnosis interval for the 1466 given host prior to `end_time`, with a starting status matching 1467 `success`. If `success` is true, the interval will start with a 1468 successful status task; if false the interval will start with a 1469 failed status task. 1470 1471 @param host_id Id in the database of the target host. 1472 @param end_time Time reference for the diagnosis interval. 1473 @param success Whether the diagnosis interval should start 1474 with a successful or failed status task. 1475 1476 @return A list of two strings. The first is the timestamp for 1477 the beginning of the interval; the second is the 1478 timestamp for the end. If the host has never changed 1479 state, the list is empty. 1480 1481 """ 1482 host = models.Host.smart_get(host_id) 1483 if not host.shard or utils.is_shard(): 1484 return status_history.get_diagnosis_interval( 1485 host_id, end_time, success) 1486 else: 1487 shard_afe = frontend.AFE(server=host.shard.rpc_hostname()) 1488 return shard_afe.get_host_diagnosis_interval( 1489 host_id, end_time, success) 1490 1491 1492# support for host detail view 1493 1494def get_host_queue_entries_and_special_tasks(host, query_start=None, 1495 query_limit=None, start_time=None, 1496 end_time=None): 1497 """ 1498 @returns an interleaved list of HostQueueEntries and SpecialTasks, 1499 in approximate run order. each dict contains keys for type, host, 1500 job, status, started_on, execution_path, and ID. 1501 """ 1502 total_limit = None 1503 if query_limit is not None: 1504 total_limit = query_start + query_limit 1505 filter_data_common = {'host': host, 1506 'query_limit': total_limit, 1507 'sort_by': ['-id']} 1508 1509 filter_data_special_tasks = rpc_utils.inject_times_to_filter( 1510 'time_started__gte', 'time_started__lte', start_time, end_time, 1511 **filter_data_common) 1512 1513 queue_entries = get_host_queue_entries( 1514 start_time, end_time, **filter_data_common) 1515 special_tasks = get_host_special_tasks(host, **filter_data_special_tasks) 1516 1517 interleaved_entries = rpc_utils.interleave_entries(queue_entries, 1518 special_tasks) 1519 if query_start is not None: 1520 interleaved_entries = interleaved_entries[query_start:] 1521 if query_limit is not None: 1522 interleaved_entries = interleaved_entries[:query_limit] 1523 return rpc_utils.prepare_host_queue_entries_and_special_tasks( 1524 interleaved_entries, queue_entries) 1525 1526 1527def get_num_host_queue_entries_and_special_tasks(host, start_time=None, 1528 end_time=None): 1529 filter_data_common = {'host': host} 1530 1531 filter_data_queue_entries, filter_data_special_tasks = ( 1532 rpc_utils.inject_times_to_hqe_special_tasks_filters( 1533 filter_data_common, start_time, end_time)) 1534 1535 return (models.HostQueueEntry.query_count(filter_data_queue_entries) 1536 + get_host_num_special_tasks(**filter_data_special_tasks)) 1537 1538 1539# recurring run 1540 1541def get_recurring(**filter_data): 1542 return rpc_utils.prepare_rows_as_nested_dicts( 1543 models.RecurringRun.query_objects(filter_data), 1544 ('job', 'owner')) 1545 1546 1547def get_num_recurring(**filter_data): 1548 return models.RecurringRun.query_count(filter_data) 1549 1550 1551def delete_recurring_runs(**filter_data): 1552 to_delete = models.RecurringRun.query_objects(filter_data) 1553 to_delete.delete() 1554 1555 1556def create_recurring_run(job_id, start_date, loop_period, loop_count): 1557 owner = models.User.current_user().login 1558 job = models.Job.objects.get(id=job_id) 1559 return job.create_recurring_job(start_date=start_date, 1560 loop_period=loop_period, 1561 loop_count=loop_count, 1562 owner=owner) 1563 1564 1565# other 1566 1567def echo(data=""): 1568 """\ 1569 Returns a passed in string. For doing a basic test to see if RPC calls 1570 can successfully be made. 1571 """ 1572 return data 1573 1574 1575def get_motd(): 1576 """\ 1577 Returns the message of the day as a string. 1578 """ 1579 return rpc_utils.get_motd() 1580 1581 1582def get_static_data(): 1583 """\ 1584 Returns a dictionary containing a bunch of data that shouldn't change 1585 often and is otherwise inaccessible. This includes: 1586 1587 priorities: List of job priority choices. 1588 default_priority: Default priority value for new jobs. 1589 users: Sorted list of all users. 1590 labels: Sorted list of labels not start with 'cros-version' and 1591 'fw-version'. 1592 atomic_groups: Sorted list of all atomic groups. 1593 tests: Sorted list of all tests. 1594 profilers: Sorted list of all profilers. 1595 current_user: Logged-in username. 1596 host_statuses: Sorted list of possible Host statuses. 1597 job_statuses: Sorted list of possible HostQueueEntry statuses. 1598 job_timeout_default: The default job timeout length in minutes. 1599 parse_failed_repair_default: Default value for the parse_failed_repair job 1600 option. 1601 reboot_before_options: A list of valid RebootBefore string enums. 1602 reboot_after_options: A list of valid RebootAfter string enums. 1603 motd: Server's message of the day. 1604 status_dictionary: A mapping from one word job status names to a more 1605 informative description. 1606 """ 1607 1608 job_fields = models.Job.get_field_dict() 1609 default_drone_set_name = models.DroneSet.default_drone_set_name() 1610 drone_sets = ([default_drone_set_name] + 1611 sorted(drone_set.name for drone_set in 1612 models.DroneSet.objects.exclude( 1613 name=default_drone_set_name))) 1614 1615 result = {} 1616 result['priorities'] = priorities.Priority.choices() 1617 default_priority = priorities.Priority.DEFAULT 1618 result['default_priority'] = 'Default' 1619 result['max_schedulable_priority'] = priorities.Priority.DEFAULT 1620 result['users'] = get_users(sort_by=['login']) 1621 1622 label_exclude_filters = [{'name__startswith': 'cros-version'}, 1623 {'name__startswith': 'fw-version'}, 1624 {'name__startswith': 'fwrw-version'}, 1625 {'name__startswith': 'fwro-version'}, 1626 {'name__startswith': 'ab-version'}, 1627 {'name__startswith': 'testbed-version'}] 1628 result['labels'] = get_labels( 1629 label_exclude_filters, 1630 sort_by=['-platform', 'name']) 1631 1632 result['atomic_groups'] = get_atomic_groups(sort_by=['name']) 1633 result['tests'] = get_tests(sort_by=['name']) 1634 result['profilers'] = get_profilers(sort_by=['name']) 1635 result['current_user'] = rpc_utils.prepare_for_serialization( 1636 models.User.current_user().get_object_dict()) 1637 result['host_statuses'] = sorted(models.Host.Status.names) 1638 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names) 1639 result['job_timeout_mins_default'] = models.Job.DEFAULT_TIMEOUT_MINS 1640 result['job_max_runtime_mins_default'] = ( 1641 models.Job.DEFAULT_MAX_RUNTIME_MINS) 1642 result['parse_failed_repair_default'] = bool( 1643 models.Job.DEFAULT_PARSE_FAILED_REPAIR) 1644 result['reboot_before_options'] = model_attributes.RebootBefore.names 1645 result['reboot_after_options'] = model_attributes.RebootAfter.names 1646 result['motd'] = rpc_utils.get_motd() 1647 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled() 1648 result['drone_sets'] = drone_sets 1649 result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled() 1650 1651 result['status_dictionary'] = {"Aborted": "Aborted", 1652 "Verifying": "Verifying Host", 1653 "Provisioning": "Provisioning Host", 1654 "Pending": "Waiting on other hosts", 1655 "Running": "Running autoserv", 1656 "Completed": "Autoserv completed", 1657 "Failed": "Failed to complete", 1658 "Queued": "Queued", 1659 "Starting": "Next in host's queue", 1660 "Stopped": "Other host(s) failed verify", 1661 "Parsing": "Awaiting parse of final results", 1662 "Gathering": "Gathering log files", 1663 "Template": "Template job for recurring run", 1664 "Waiting": "Waiting for scheduler action", 1665 "Archiving": "Archiving results", 1666 "Resetting": "Resetting hosts"} 1667 1668 result['wmatrix_url'] = rpc_utils.get_wmatrix_url() 1669 result['is_moblab'] = bool(utils.is_moblab()) 1670 1671 return result 1672 1673 1674def get_server_time(): 1675 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M") 1676