models.py revision 6964fa53798525f39cd490e26481e8d923815406
1# pylint: disable-msg=C0111 2 3import logging, os 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23from autotest_lib.client.common_lib import site_utils 24from autotest_lib.client.common_lib.cros.graphite import es_utils 25 26# job options and user preferences 27DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 28DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 29 30 31class AclAccessViolation(Exception): 32 """\ 33 Raised when an operation is attempted with proper permissions as 34 dictated by ACLs. 35 """ 36 37 38class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 39 """\ 40 An atomic group defines a collection of hosts which must only be scheduled 41 all at once. Any host with a label having an atomic group will only be 42 scheduled for a job at the same time as other hosts sharing that label. 43 44 Required: 45 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 46 max_number_of_machines: The maximum number of machines that will be 47 scheduled at once when scheduling jobs to this atomic group. 48 The job.synch_count is considered the minimum. 49 50 Optional: 51 description: Arbitrary text description of this group's purpose. 52 """ 53 name = dbmodels.CharField(max_length=255, unique=True) 54 description = dbmodels.TextField(blank=True) 55 # This magic value is the default to simplify the scheduler logic. 56 # It must be "large". The common use of atomic groups is to want all 57 # machines in the group to be used, limits on which subset used are 58 # often chosen via dependency labels. 59 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 60 # "infinity" is around 3.3 million. 61 INFINITE_MACHINES = 333333333 62 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 63 invalid = dbmodels.BooleanField(default=False, 64 editable=settings.FULL_ADMIN) 65 66 name_field = 'name' 67 objects = model_logic.ModelWithInvalidManager() 68 valid_objects = model_logic.ValidObjectsManager() 69 70 71 def enqueue_job(self, job, is_template=False): 72 """Enqueue a job on an associated atomic group of hosts. 73 74 @param job: A job to enqueue. 75 @param is_template: Whether the status should be "Template". 76 """ 77 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 78 is_template=is_template) 79 queue_entry.save() 80 81 82 def clean_object(self): 83 self.label_set.clear() 84 85 86 class Meta: 87 """Metadata for class AtomicGroup.""" 88 db_table = 'afe_atomic_groups' 89 90 91 def __unicode__(self): 92 return unicode(self.name) 93 94 95class Label(model_logic.ModelWithInvalid, dbmodels.Model): 96 """\ 97 Required: 98 name: label name 99 100 Optional: 101 kernel_config: URL/path to kernel config for jobs run on this label. 102 platform: If True, this is a platform label (defaults to False). 103 only_if_needed: If True, a Host with this label can only be used if that 104 label is requested by the job/test (either as the meta_host or 105 in the job_dependencies). 106 atomic_group: The atomic group associated with this label. 107 """ 108 name = dbmodels.CharField(max_length=255, unique=True) 109 kernel_config = dbmodels.CharField(max_length=255, blank=True) 110 platform = dbmodels.BooleanField(default=False) 111 invalid = dbmodels.BooleanField(default=False, 112 editable=settings.FULL_ADMIN) 113 only_if_needed = dbmodels.BooleanField(default=False) 114 115 name_field = 'name' 116 objects = model_logic.ModelWithInvalidManager() 117 valid_objects = model_logic.ValidObjectsManager() 118 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 119 120 121 def clean_object(self): 122 self.host_set.clear() 123 self.test_set.clear() 124 125 126 def enqueue_job(self, job, atomic_group=None, is_template=False): 127 """Enqueue a job on any host of this label. 128 129 @param job: A job to enqueue. 130 @param atomic_group: The associated atomic group. 131 @param is_template: Whether the status should be "Template". 132 """ 133 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 134 is_template=is_template, 135 atomic_group=atomic_group) 136 queue_entry.save() 137 138 139 class Meta: 140 """Metadata for class Label.""" 141 db_table = 'afe_labels' 142 143 def __unicode__(self): 144 return unicode(self.name) 145 146 147class Shard(dbmodels.Model, model_logic.ModelExtensions): 148 149 hostname = dbmodels.CharField(max_length=255, unique=True) 150 151 name_field = 'hostname' 152 153 labels = dbmodels.ManyToManyField(Label, blank=True, 154 db_table='afe_shards_labels') 155 156 class Meta: 157 """Metadata for class ParameterizedJob.""" 158 db_table = 'afe_shards' 159 160 161 def rpc_hostname(self): 162 """Get the rpc hostname of the shard. 163 164 @return: Just the shard hostname for all non-testing environments. 165 The address of the default gateway for vm testing environments. 166 """ 167 # TODO: Figure out a better solution for testing. Since no 2 shards 168 # can run on the same host, if the shard hostname is localhost we 169 # conclude that it must be a vm in a test cluster. In such situations 170 # a name of localhost:<port> is necessary to achieve the correct 171 # afe links/redirection from the frontend (this happens through the 172 # host), but for rpcs that are performed *on* the shard, they need to 173 # use the address of the gateway. 174 hostname = self.hostname.split(':')[0] 175 if site_utils.is_localhost(hostname): 176 return self.hostname.replace( 177 hostname, site_utils.DEFAULT_VM_GATEWAY) 178 return self.hostname 179 180 181class Drone(dbmodels.Model, model_logic.ModelExtensions): 182 """ 183 A scheduler drone 184 185 hostname: the drone's hostname 186 """ 187 hostname = dbmodels.CharField(max_length=255, unique=True) 188 189 name_field = 'hostname' 190 objects = model_logic.ExtendedManager() 191 192 193 def save(self, *args, **kwargs): 194 if not User.current_user().is_superuser(): 195 raise Exception('Only superusers may edit drones') 196 super(Drone, self).save(*args, **kwargs) 197 198 199 def delete(self): 200 if not User.current_user().is_superuser(): 201 raise Exception('Only superusers may delete drones') 202 super(Drone, self).delete() 203 204 205 class Meta: 206 """Metadata for class Drone.""" 207 db_table = 'afe_drones' 208 209 def __unicode__(self): 210 return unicode(self.hostname) 211 212 213class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 214 """ 215 A set of scheduler drones 216 217 These will be used by the scheduler to decide what drones a job is allowed 218 to run on. 219 220 name: the drone set's name 221 drones: the drones that are part of the set 222 """ 223 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 224 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 225 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 226 'SCHEDULER', 'default_drone_set_name', default=None) 227 228 name = dbmodels.CharField(max_length=255, unique=True) 229 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 230 231 name_field = 'name' 232 objects = model_logic.ExtendedManager() 233 234 235 def save(self, *args, **kwargs): 236 if not User.current_user().is_superuser(): 237 raise Exception('Only superusers may edit drone sets') 238 super(DroneSet, self).save(*args, **kwargs) 239 240 241 def delete(self): 242 if not User.current_user().is_superuser(): 243 raise Exception('Only superusers may delete drone sets') 244 super(DroneSet, self).delete() 245 246 247 @classmethod 248 def drone_sets_enabled(cls): 249 """Returns whether drone sets are enabled. 250 251 @param cls: Implicit class object. 252 """ 253 return cls.DRONE_SETS_ENABLED 254 255 256 @classmethod 257 def default_drone_set_name(cls): 258 """Returns the default drone set name. 259 260 @param cls: Implicit class object. 261 """ 262 return cls.DEFAULT_DRONE_SET_NAME 263 264 265 @classmethod 266 def get_default(cls): 267 """Gets the default drone set name, compatible with Job.add_object. 268 269 @param cls: Implicit class object. 270 """ 271 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 272 273 274 @classmethod 275 def resolve_name(cls, drone_set_name): 276 """ 277 Returns the name of one of these, if not None, in order of preference: 278 1) the drone set given, 279 2) the current user's default drone set, or 280 3) the global default drone set 281 282 or returns None if drone sets are disabled 283 284 @param cls: Implicit class object. 285 @param drone_set_name: A drone set name. 286 """ 287 if not cls.drone_sets_enabled(): 288 return None 289 290 user = User.current_user() 291 user_drone_set_name = user.drone_set and user.drone_set.name 292 293 return drone_set_name or user_drone_set_name or cls.get_default().name 294 295 296 def get_drone_hostnames(self): 297 """ 298 Gets the hostnames of all drones in this drone set 299 """ 300 return set(self.drones.all().values_list('hostname', flat=True)) 301 302 303 class Meta: 304 """Metadata for class DroneSet.""" 305 db_table = 'afe_drone_sets' 306 307 def __unicode__(self): 308 return unicode(self.name) 309 310 311class User(dbmodels.Model, model_logic.ModelExtensions): 312 """\ 313 Required: 314 login :user login name 315 316 Optional: 317 access_level: 0=User (default), 1=Admin, 100=Root 318 """ 319 ACCESS_ROOT = 100 320 ACCESS_ADMIN = 1 321 ACCESS_USER = 0 322 323 AUTOTEST_SYSTEM = 'autotest_system' 324 325 login = dbmodels.CharField(max_length=255, unique=True) 326 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 327 328 # user preferences 329 reboot_before = dbmodels.SmallIntegerField( 330 choices=model_attributes.RebootBefore.choices(), blank=True, 331 default=DEFAULT_REBOOT_BEFORE) 332 reboot_after = dbmodels.SmallIntegerField( 333 choices=model_attributes.RebootAfter.choices(), blank=True, 334 default=DEFAULT_REBOOT_AFTER) 335 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 336 show_experimental = dbmodels.BooleanField(default=False) 337 338 name_field = 'login' 339 objects = model_logic.ExtendedManager() 340 341 342 def save(self, *args, **kwargs): 343 # is this a new object being saved for the first time? 344 first_time = (self.id is None) 345 user = thread_local.get_user() 346 if user and not user.is_superuser() and user.login != self.login: 347 raise AclAccessViolation("You cannot modify user " + self.login) 348 super(User, self).save(*args, **kwargs) 349 if first_time: 350 everyone = AclGroup.objects.get(name='Everyone') 351 everyone.users.add(self) 352 353 354 def is_superuser(self): 355 """Returns whether the user has superuser access.""" 356 return self.access_level >= self.ACCESS_ROOT 357 358 359 @classmethod 360 def current_user(cls): 361 """Returns the current user. 362 363 @param cls: Implicit class object. 364 """ 365 user = thread_local.get_user() 366 if user is None: 367 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 368 user.access_level = cls.ACCESS_ROOT 369 user.save() 370 return user 371 372 373 @classmethod 374 def get_record(cls, data): 375 """Check the database for an identical record. 376 377 Check for a record with matching id and login. If one exists, 378 return it. If one does not exist there is a possibility that 379 the following cases have happened: 380 1. Same id, different login 381 We received: "1 chromeos-test" 382 And we have: "1 debug-user" 383 In this case we need to delete "1 debug_user" and insert 384 "1 chromeos-test". 385 386 2. Same login, different id: 387 We received: "1 chromeos-test" 388 And we have: "2 chromeos-test" 389 In this case we need to delete "2 chromeos-test" and insert 390 "1 chromeos-test". 391 392 As long as this method deletes bad records and raises the 393 DoesNotExist exception the caller will handle creating the 394 new record. 395 396 @raises: DoesNotExist, if a record with the matching login and id 397 does not exist. 398 """ 399 400 # Both the id and login should be uniqe but there are cases when 401 # we might already have a user with the same login/id because 402 # current_user will proactively create a user record if it doesn't 403 # exist. Since we want to avoid conflict between the master and 404 # shard, just delete any existing user records that don't match 405 # what we're about to deserialize from the master. 406 try: 407 return cls.objects.get(login=data['login'], id=data['id']) 408 except cls.DoesNotExist: 409 cls.delete_matching_record(login=data['login']) 410 cls.delete_matching_record(id=data['id']) 411 raise 412 413 414 class Meta: 415 """Metadata for class User.""" 416 db_table = 'afe_users' 417 418 def __unicode__(self): 419 return unicode(self.login) 420 421 422class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 423 model_logic.ModelWithAttributes): 424 """\ 425 Required: 426 hostname 427 428 optional: 429 locked: if true, host is locked and will not be queued 430 431 Internal: 432 From AbstractHostModel: 433 synch_id: currently unused 434 status: string describing status of host 435 invalid: true if the host has been deleted 436 protection: indicates what can be done to this host during repair 437 lock_time: DateTime at which the host was locked 438 dirty: true if the host has been used without being rebooted 439 Local: 440 locked_by: user that locked the host, or null if the host is unlocked 441 """ 442 443 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 444 'hostattribute_set', 445 'labels', 446 'shard']) 447 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 448 449 450 def custom_deserialize_relation(self, link, data): 451 assert link == 'shard', 'Link %s should not be deserialized' % link 452 self.shard = Shard.deserialize(data) 453 454 455 # Note: Only specify foreign keys here, specify all native host columns in 456 # rdb_model_extensions instead. 457 Protection = host_protections.Protection 458 labels = dbmodels.ManyToManyField(Label, blank=True, 459 db_table='afe_hosts_labels') 460 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 461 name_field = 'hostname' 462 objects = model_logic.ModelWithInvalidManager() 463 valid_objects = model_logic.ValidObjectsManager() 464 leased_objects = model_logic.LeasedHostManager() 465 466 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 467 468 def __init__(self, *args, **kwargs): 469 super(Host, self).__init__(*args, **kwargs) 470 self._record_attributes(['status']) 471 472 473 @staticmethod 474 def create_one_time_host(hostname): 475 """Creates a one-time host. 476 477 @param hostname: The name for the host. 478 """ 479 query = Host.objects.filter(hostname=hostname) 480 if query.count() == 0: 481 host = Host(hostname=hostname, invalid=True) 482 host.do_validate() 483 else: 484 host = query[0] 485 if not host.invalid: 486 raise model_logic.ValidationError({ 487 'hostname' : '%s already exists in the autotest DB. ' 488 'Select it rather than entering it as a one time ' 489 'host.' % hostname 490 }) 491 host.protection = host_protections.Protection.DO_NOT_REPAIR 492 host.locked = False 493 host.save() 494 host.clean_object() 495 return host 496 497 498 @classmethod 499 def assign_to_shard(cls, shard, known_ids): 500 """Assigns hosts to a shard. 501 502 For all labels that have been assigned to this shard, all hosts that 503 have this label, are assigned to this shard. 504 505 Hosts that are assigned to the shard but aren't already present on the 506 shard are returned. 507 508 @param shard: The shard object to assign labels/hosts for. 509 @param known_ids: List of all host-ids the shard already knows. 510 This is used to figure out which hosts should be sent 511 to the shard. If shard_ids were used instead, hosts 512 would only be transferred once, even if the client 513 failed persisting them. 514 The number of hosts usually lies in O(100), so the 515 overhead is acceptable. 516 517 @returns the hosts objects that should be sent to the shard. 518 """ 519 520 # Disclaimer: concurrent heartbeats should theoretically not occur in 521 # the current setup. As they may be introduced in the near future, 522 # this comment will be left here. 523 524 # Sending stuff twice is acceptable, but forgetting something isn't. 525 # Detecting duplicates on the client is easy, but here it's harder. The 526 # following options were considered: 527 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 528 # than select returned, as concurrently more hosts might have been 529 # inserted 530 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 531 # hosts for the shard, this is overhead 532 # - SELECT and then UPDATE only selected without requerying afterwards: 533 # returns the old state of the records. 534 host_ids = list(Host.objects.filter( 535 labels=shard.labels.all(), 536 leased=False 537 ).exclude( 538 id__in=known_ids, 539 ).values_list('pk', flat=True)) 540 541 if host_ids: 542 Host.objects.filter(pk__in=host_ids).update(shard=shard) 543 return list(Host.objects.filter(pk__in=host_ids).all()) 544 return [] 545 546 def resurrect_object(self, old_object): 547 super(Host, self).resurrect_object(old_object) 548 # invalid hosts can be in use by the scheduler (as one-time hosts), so 549 # don't change the status 550 self.status = old_object.status 551 552 553 def clean_object(self): 554 self.aclgroup_set.clear() 555 self.labels.clear() 556 557 558 def record_state(self, type_str, state, value, other_metadata=None): 559 """Record metadata in elasticsearch. 560 561 @param type_str: sets the _type field in elasticsearch db. 562 @param state: string representing what state we are recording, 563 e.g. 'locked' 564 @param value: value of the state, e.g. True 565 @param other_metadata: Other metadata to store in metaDB. 566 """ 567 metadata = { 568 state: value, 569 'hostname': self.hostname, 570 } 571 if other_metadata: 572 metadata = dict(metadata.items() + other_metadata.items()) 573 es_utils.ESMetadata().post(type_str=type_str, metadata=metadata) 574 575 576 def save(self, *args, **kwargs): 577 # extra spaces in the hostname can be a sneaky source of errors 578 self.hostname = self.hostname.strip() 579 # is this a new object being saved for the first time? 580 first_time = (self.id is None) 581 if not first_time: 582 AclGroup.check_for_acl_violation_hosts([self]) 583 # If locked is changed, send its status and user made the change to 584 # metaDB. Locks are important in host history because if a device is 585 # locked then we don't really care what state it is in. 586 if self.locked and not self.locked_by: 587 self.locked_by = User.current_user() 588 self.lock_time = datetime.now() 589 self.record_state('lock_history', 'locked', self.locked, 590 {'changed_by': self.locked_by.login}) 591 self.dirty = True 592 elif not self.locked and self.locked_by: 593 self.record_state('lock_history', 'locked', self.locked, 594 {'changed_by': self.locked_by.login}) 595 self.locked_by = None 596 self.lock_time = None 597 super(Host, self).save(*args, **kwargs) 598 if first_time: 599 everyone = AclGroup.objects.get(name='Everyone') 600 everyone.hosts.add(self) 601 self._check_for_updated_attributes() 602 603 604 def delete(self): 605 AclGroup.check_for_acl_violation_hosts([self]) 606 for queue_entry in self.hostqueueentry_set.all(): 607 queue_entry.deleted = True 608 queue_entry.abort() 609 super(Host, self).delete() 610 611 612 def on_attribute_changed(self, attribute, old_value): 613 assert attribute == 'status' 614 logging.info(self.hostname + ' -> ' + self.status) 615 616 617 def enqueue_job(self, job, atomic_group=None, is_template=False): 618 """Enqueue a job on this host. 619 620 @param job: A job to enqueue. 621 @param atomic_group: The associated atomic group. 622 @param is_template: Whther the status should be "Template". 623 """ 624 queue_entry = HostQueueEntry.create(host=self, job=job, 625 is_template=is_template, 626 atomic_group=atomic_group) 627 # allow recovery of dead hosts from the frontend 628 if not self.active_queue_entry() and self.is_dead(): 629 self.status = Host.Status.READY 630 self.save() 631 queue_entry.save() 632 633 block = IneligibleHostQueue(job=job, host=self) 634 block.save() 635 636 637 def platform(self): 638 """The platform of the host.""" 639 # TODO(showard): slighly hacky? 640 platforms = self.labels.filter(platform=True) 641 if len(platforms) == 0: 642 return None 643 return platforms[0] 644 platform.short_description = 'Platform' 645 646 647 @classmethod 648 def check_no_platform(cls, hosts): 649 """Verify the specified hosts have no associated platforms. 650 651 @param cls: Implicit class object. 652 @param hosts: The hosts to verify. 653 @raises model_logic.ValidationError if any hosts already have a 654 platform. 655 """ 656 Host.objects.populate_relationships(hosts, Label, 'label_list') 657 errors = [] 658 for host in hosts: 659 platforms = [label.name for label in host.label_list 660 if label.platform] 661 if platforms: 662 # do a join, just in case this host has multiple platforms, 663 # we'll be able to see it 664 errors.append('Host %s already has a platform: %s' % ( 665 host.hostname, ', '.join(platforms))) 666 if errors: 667 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 668 669 670 def is_dead(self): 671 """Returns whether the host is dead (has status repair failed).""" 672 return self.status == Host.Status.REPAIR_FAILED 673 674 675 def active_queue_entry(self): 676 """Returns the active queue entry for this host, or None if none.""" 677 active = list(self.hostqueueentry_set.filter(active=True)) 678 if not active: 679 return None 680 assert len(active) == 1, ('More than one active entry for ' 681 'host ' + self.hostname) 682 return active[0] 683 684 685 def _get_attribute_model_and_args(self, attribute): 686 return HostAttribute, dict(host=self, attribute=attribute) 687 688 689 class Meta: 690 """Metadata for the Host class.""" 691 db_table = 'afe_hosts' 692 693 def __unicode__(self): 694 return unicode(self.hostname) 695 696 697class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 698 """Arbitrary keyvals associated with hosts.""" 699 host = dbmodels.ForeignKey(Host) 700 attribute = dbmodels.CharField(max_length=90) 701 value = dbmodels.CharField(max_length=300) 702 703 objects = model_logic.ExtendedManager() 704 705 class Meta: 706 """Metadata for the HostAttribute class.""" 707 db_table = 'afe_host_attributes' 708 709 710class Test(dbmodels.Model, model_logic.ModelExtensions): 711 """\ 712 Required: 713 author: author name 714 description: description of the test 715 name: test name 716 time: short, medium, long 717 test_class: This describes the class for your the test belongs in. 718 test_category: This describes the category for your tests 719 test_type: Client or Server 720 path: path to pass to run_test() 721 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 722 async job. If it's >1 it's sync job for that number of machines 723 i.e. if sync_count = 2 it is a sync job that requires two 724 machines. 725 Optional: 726 dependencies: What the test requires to run. Comma deliminated list 727 dependency_labels: many-to-many relationship with labels corresponding to 728 test dependencies. 729 experimental: If this is set to True production servers will ignore the test 730 run_verify: Whether or not the scheduler should run the verify stage 731 run_reset: Whether or not the scheduler should run the reset stage 732 test_retry: Number of times to retry test if the test did not complete 733 successfully. (optional, default: 0) 734 """ 735 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 736 737 name = dbmodels.CharField(max_length=255, unique=True) 738 author = dbmodels.CharField(max_length=255) 739 test_class = dbmodels.CharField(max_length=255) 740 test_category = dbmodels.CharField(max_length=255) 741 dependencies = dbmodels.CharField(max_length=255, blank=True) 742 description = dbmodels.TextField(blank=True) 743 experimental = dbmodels.BooleanField(default=True) 744 run_verify = dbmodels.BooleanField(default=False) 745 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 746 default=TestTime.MEDIUM) 747 test_type = dbmodels.SmallIntegerField( 748 choices=control_data.CONTROL_TYPE.choices()) 749 sync_count = dbmodels.IntegerField(default=1) 750 path = dbmodels.CharField(max_length=255, unique=True) 751 test_retry = dbmodels.IntegerField(blank=True, default=0) 752 run_reset = dbmodels.BooleanField(default=True) 753 754 dependency_labels = ( 755 dbmodels.ManyToManyField(Label, blank=True, 756 db_table='afe_autotests_dependency_labels')) 757 name_field = 'name' 758 objects = model_logic.ExtendedManager() 759 760 761 def admin_description(self): 762 """Returns a string representing the admin description.""" 763 escaped_description = saxutils.escape(self.description) 764 return '<span style="white-space:pre">%s</span>' % escaped_description 765 admin_description.allow_tags = True 766 admin_description.short_description = 'Description' 767 768 769 class Meta: 770 """Metadata for class Test.""" 771 db_table = 'afe_autotests' 772 773 def __unicode__(self): 774 return unicode(self.name) 775 776 777class TestParameter(dbmodels.Model): 778 """ 779 A declared parameter of a test 780 """ 781 test = dbmodels.ForeignKey(Test) 782 name = dbmodels.CharField(max_length=255) 783 784 class Meta: 785 """Metadata for class TestParameter.""" 786 db_table = 'afe_test_parameters' 787 unique_together = ('test', 'name') 788 789 def __unicode__(self): 790 return u'%s (%s)' % (self.name, self.test.name) 791 792 793class Profiler(dbmodels.Model, model_logic.ModelExtensions): 794 """\ 795 Required: 796 name: profiler name 797 test_type: Client or Server 798 799 Optional: 800 description: arbirary text description 801 """ 802 name = dbmodels.CharField(max_length=255, unique=True) 803 description = dbmodels.TextField(blank=True) 804 805 name_field = 'name' 806 objects = model_logic.ExtendedManager() 807 808 809 class Meta: 810 """Metadata for class Profiler.""" 811 db_table = 'afe_profilers' 812 813 def __unicode__(self): 814 return unicode(self.name) 815 816 817class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 818 """\ 819 Required: 820 name: name of ACL group 821 822 Optional: 823 description: arbitrary description of group 824 """ 825 826 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 827 828 name = dbmodels.CharField(max_length=255, unique=True) 829 description = dbmodels.CharField(max_length=255, blank=True) 830 users = dbmodels.ManyToManyField(User, blank=False, 831 db_table='afe_acl_groups_users') 832 hosts = dbmodels.ManyToManyField(Host, blank=True, 833 db_table='afe_acl_groups_hosts') 834 835 name_field = 'name' 836 objects = model_logic.ExtendedManager() 837 838 @staticmethod 839 def check_for_acl_violation_hosts(hosts): 840 """Verify the current user has access to the specified hosts. 841 842 @param hosts: The hosts to verify against. 843 @raises AclAccessViolation if the current user doesn't have access 844 to a host. 845 """ 846 user = User.current_user() 847 if user.is_superuser(): 848 return 849 accessible_host_ids = set( 850 host.id for host in Host.objects.filter(aclgroup__users=user)) 851 for host in hosts: 852 # Check if the user has access to this host, 853 # but only if it is not a metahost or a one-time-host. 854 no_access = (isinstance(host, Host) 855 and not host.invalid 856 and int(host.id) not in accessible_host_ids) 857 if no_access: 858 raise AclAccessViolation("%s does not have access to %s" % 859 (str(user), str(host))) 860 861 862 @staticmethod 863 def check_abort_permissions(queue_entries): 864 """Look for queue entries that aren't abortable by the current user. 865 866 An entry is not abortable if: 867 * the job isn't owned by this user, and 868 * the machine isn't ACL-accessible, or 869 * the machine is in the "Everyone" ACL 870 871 @param queue_entries: The queue entries to check. 872 @raises AclAccessViolation if a queue entry is not abortable by the 873 current user. 874 """ 875 user = User.current_user() 876 if user.is_superuser(): 877 return 878 not_owned = queue_entries.exclude(job__owner=user.login) 879 # I do this using ID sets instead of just Django filters because 880 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 881 # 1.0. 882 # TODO: Use Django filters, now that we're using 1.0. 883 accessible_ids = set( 884 entry.id for entry 885 in not_owned.filter(host__aclgroup__users__login=user.login)) 886 public_ids = set(entry.id for entry 887 in not_owned.filter(host__aclgroup__name='Everyone')) 888 cannot_abort = [entry for entry in not_owned.select_related() 889 if entry.id not in accessible_ids 890 or entry.id in public_ids] 891 if len(cannot_abort) == 0: 892 return 893 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 894 entry.host_or_metahost_name()) 895 for entry in cannot_abort) 896 raise AclAccessViolation('You cannot abort the following job entries: ' 897 + entry_names) 898 899 900 def check_for_acl_violation_acl_group(self): 901 """Verifies the current user has acces to this ACL group. 902 903 @raises AclAccessViolation if the current user doesn't have access to 904 this ACL group. 905 """ 906 user = User.current_user() 907 if user.is_superuser(): 908 return 909 if self.name == 'Everyone': 910 raise AclAccessViolation("You cannot modify 'Everyone'!") 911 if not user in self.users.all(): 912 raise AclAccessViolation("You do not have access to %s" 913 % self.name) 914 915 @staticmethod 916 def on_host_membership_change(): 917 """Invoked when host membership changes.""" 918 everyone = AclGroup.objects.get(name='Everyone') 919 920 # find hosts that aren't in any ACL group and add them to Everyone 921 # TODO(showard): this is a bit of a hack, since the fact that this query 922 # works is kind of a coincidence of Django internals. This trick 923 # doesn't work in general (on all foreign key relationships). I'll 924 # replace it with a better technique when the need arises. 925 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 926 everyone.hosts.add(*orphaned_hosts.distinct()) 927 928 # find hosts in both Everyone and another ACL group, and remove them 929 # from Everyone 930 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 931 acled_hosts = set() 932 for host in hosts_in_everyone: 933 # Has an ACL group other than Everyone 934 if host.aclgroup_set.count() > 1: 935 acled_hosts.add(host) 936 everyone.hosts.remove(*acled_hosts) 937 938 939 def delete(self): 940 if (self.name == 'Everyone'): 941 raise AclAccessViolation("You cannot delete 'Everyone'!") 942 self.check_for_acl_violation_acl_group() 943 super(AclGroup, self).delete() 944 self.on_host_membership_change() 945 946 947 def add_current_user_if_empty(self): 948 """Adds the current user if the set of users is empty.""" 949 if not self.users.count(): 950 self.users.add(User.current_user()) 951 952 953 def perform_after_save(self, change): 954 """Called after a save. 955 956 @param change: Whether there was a change. 957 """ 958 if not change: 959 self.users.add(User.current_user()) 960 self.add_current_user_if_empty() 961 self.on_host_membership_change() 962 963 964 def save(self, *args, **kwargs): 965 change = bool(self.id) 966 if change: 967 # Check the original object for an ACL violation 968 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 969 super(AclGroup, self).save(*args, **kwargs) 970 self.perform_after_save(change) 971 972 973 class Meta: 974 """Metadata for class AclGroup.""" 975 db_table = 'afe_acl_groups' 976 977 def __unicode__(self): 978 return unicode(self.name) 979 980 981class Kernel(dbmodels.Model): 982 """ 983 A kernel configuration for a parameterized job 984 """ 985 version = dbmodels.CharField(max_length=255) 986 cmdline = dbmodels.CharField(max_length=255, blank=True) 987 988 @classmethod 989 def create_kernels(cls, kernel_list): 990 """Creates all kernels in the kernel list. 991 992 @param cls: Implicit class object. 993 @param kernel_list: A list of dictionaries that describe the kernels, 994 in the same format as the 'kernel' argument to 995 rpc_interface.generate_control_file. 996 @return A list of the created kernels. 997 """ 998 if not kernel_list: 999 return None 1000 return [cls._create(kernel) for kernel in kernel_list] 1001 1002 1003 @classmethod 1004 def _create(cls, kernel_dict): 1005 version = kernel_dict.pop('version') 1006 cmdline = kernel_dict.pop('cmdline', '') 1007 1008 if kernel_dict: 1009 raise Exception('Extraneous kernel arguments remain: %r' 1010 % kernel_dict) 1011 1012 kernel, _ = cls.objects.get_or_create(version=version, 1013 cmdline=cmdline) 1014 return kernel 1015 1016 1017 class Meta: 1018 """Metadata for class Kernel.""" 1019 db_table = 'afe_kernels' 1020 unique_together = ('version', 'cmdline') 1021 1022 def __unicode__(self): 1023 return u'%s %s' % (self.version, self.cmdline) 1024 1025 1026class ParameterizedJob(dbmodels.Model): 1027 """ 1028 Auxiliary configuration for a parameterized job. 1029 """ 1030 test = dbmodels.ForeignKey(Test) 1031 label = dbmodels.ForeignKey(Label, null=True) 1032 use_container = dbmodels.BooleanField(default=False) 1033 profile_only = dbmodels.BooleanField(default=False) 1034 upload_kernel_config = dbmodels.BooleanField(default=False) 1035 1036 kernels = dbmodels.ManyToManyField( 1037 Kernel, db_table='afe_parameterized_job_kernels') 1038 profilers = dbmodels.ManyToManyField( 1039 Profiler, through='ParameterizedJobProfiler') 1040 1041 1042 @classmethod 1043 def smart_get(cls, id_or_name, *args, **kwargs): 1044 """For compatibility with Job.add_object. 1045 1046 @param cls: Implicit class object. 1047 @param id_or_name: The ID or name to get. 1048 @param args: Non-keyword arguments. 1049 @param kwargs: Keyword arguments. 1050 """ 1051 return cls.objects.get(pk=id_or_name) 1052 1053 1054 def job(self): 1055 """Returns the job if it exists, or else None.""" 1056 jobs = self.job_set.all() 1057 assert jobs.count() <= 1 1058 return jobs and jobs[0] or None 1059 1060 1061 class Meta: 1062 """Metadata for class ParameterizedJob.""" 1063 db_table = 'afe_parameterized_jobs' 1064 1065 def __unicode__(self): 1066 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1067 1068 1069class ParameterizedJobProfiler(dbmodels.Model): 1070 """ 1071 A profiler to run on a parameterized job 1072 """ 1073 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1074 profiler = dbmodels.ForeignKey(Profiler) 1075 1076 class Meta: 1077 """Metedata for class ParameterizedJobProfiler.""" 1078 db_table = 'afe_parameterized_jobs_profilers' 1079 unique_together = ('parameterized_job', 'profiler') 1080 1081 1082class ParameterizedJobProfilerParameter(dbmodels.Model): 1083 """ 1084 A parameter for a profiler in a parameterized job 1085 """ 1086 parameterized_job_profiler = dbmodels.ForeignKey(ParameterizedJobProfiler) 1087 parameter_name = dbmodels.CharField(max_length=255) 1088 parameter_value = dbmodels.TextField() 1089 parameter_type = dbmodels.CharField( 1090 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1091 1092 class Meta: 1093 """Metadata for class ParameterizedJobProfilerParameter.""" 1094 db_table = 'afe_parameterized_job_profiler_parameters' 1095 unique_together = ('parameterized_job_profiler', 'parameter_name') 1096 1097 def __unicode__(self): 1098 return u'%s - %s' % (self.parameterized_job_profiler.profiler.name, 1099 self.parameter_name) 1100 1101 1102class ParameterizedJobParameter(dbmodels.Model): 1103 """ 1104 Parameters for a parameterized job 1105 """ 1106 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1107 test_parameter = dbmodels.ForeignKey(TestParameter) 1108 parameter_value = dbmodels.TextField() 1109 parameter_type = dbmodels.CharField( 1110 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1111 1112 class Meta: 1113 """Metadata for class ParameterizedJobParameter.""" 1114 db_table = 'afe_parameterized_job_parameters' 1115 unique_together = ('parameterized_job', 'test_parameter') 1116 1117 def __unicode__(self): 1118 return u'%s - %s' % (self.parameterized_job.job().name, 1119 self.test_parameter.name) 1120 1121 1122class JobManager(model_logic.ExtendedManager): 1123 'Custom manager to provide efficient status counts querying.' 1124 def get_status_counts(self, job_ids): 1125 """Returns a dict mapping the given job IDs to their status count dicts. 1126 1127 @param job_ids: A list of job IDs. 1128 """ 1129 if not job_ids: 1130 return {} 1131 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1132 cursor = connection.cursor() 1133 cursor.execute(""" 1134 SELECT job_id, status, aborted, complete, COUNT(*) 1135 FROM afe_host_queue_entries 1136 WHERE job_id IN %s 1137 GROUP BY job_id, status, aborted, complete 1138 """ % id_list) 1139 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1140 for job_id, status, aborted, complete, count in cursor.fetchall(): 1141 job_dict = all_job_counts[job_id] 1142 full_status = HostQueueEntry.compute_full_status(status, aborted, 1143 complete) 1144 job_dict.setdefault(full_status, 0) 1145 job_dict[full_status] += count 1146 return all_job_counts 1147 1148 1149class Job(dbmodels.Model, model_logic.ModelExtensions): 1150 """\ 1151 owner: username of job owner 1152 name: job name (does not have to be unique) 1153 priority: Integer priority value. Higher is more important. 1154 control_file: contents of control file 1155 control_type: Client or Server 1156 created_on: date of job creation 1157 submitted_on: date of job submission 1158 synch_count: how many hosts should be used per autoserv execution 1159 run_verify: Whether or not to run the verify phase 1160 run_reset: Whether or not to run the reset phase 1161 timeout: DEPRECATED - hours from queuing time until job times out 1162 timeout_mins: minutes from job queuing time until the job times out 1163 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1164 times out 1165 max_runtime_mins: minutes from job starting time until job times out 1166 email_list: list of people to email on completion delimited by any of: 1167 white space, ',', ':', ';' 1168 dependency_labels: many-to-many relationship with labels corresponding to 1169 job dependencies 1170 reboot_before: Never, If dirty, or Always 1171 reboot_after: Never, If all tests passed, or Always 1172 parse_failed_repair: if True, a failed repair launched by this job will have 1173 its results parsed as part of the job. 1174 drone_set: The set of drones to run this job on 1175 parent_job: Parent job (optional) 1176 test_retry: Number of times to retry test if the test did not complete 1177 successfully. (optional, default: 0) 1178 """ 1179 1180 # TODO: Investigate, if jobkeyval_set is really needed. 1181 # dynamic_suite will write them into an attached file for the drone, but 1182 # it doesn't seem like they are actually used. If they aren't used, remove 1183 # jobkeyval_set here. 1184 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1185 'hostqueueentry_set', 1186 'jobkeyval_set', 1187 'shard']) 1188 1189 1190 def _deserialize_relation(self, link, data): 1191 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1192 for obj in data: 1193 obj['job_id'] = self.id 1194 1195 super(Job, self)._deserialize_relation(link, data) 1196 1197 1198 def custom_deserialize_relation(self, link, data): 1199 assert link == 'shard', 'Link %s should not be deserialized' % link 1200 self.shard = Shard.deserialize(data) 1201 1202 1203 def sanity_check_update_from_shard(self, shard, updated_serialized): 1204 # If the job got aborted on the master after the client fetched it 1205 # no shard_id will be set. The shard might still push updates though, 1206 # as the job might complete before the abort bit syncs to the shard. 1207 # Alternative considered: The master scheduler could be changed to not 1208 # set aborted jobs to completed that are sharded out. But that would 1209 # require database queries and seemed more complicated to implement. 1210 # This seems safe to do, as there won't be updates pushed from the wrong 1211 # shards should be powered off and wiped hen they are removed from the 1212 # master. 1213 if self.shard_id and self.shard_id != shard.id: 1214 raise error.UnallowedRecordsSentToMaster( 1215 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1216 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1217 shard.id)) 1218 1219 1220 # TIMEOUT is deprecated. 1221 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1222 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1223 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1224 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1225 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1226 # completed. 1227 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1228 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1229 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1230 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1231 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1232 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1233 default=False) 1234 1235 owner = dbmodels.CharField(max_length=255) 1236 name = dbmodels.CharField(max_length=255) 1237 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1238 control_file = dbmodels.TextField(null=True, blank=True) 1239 control_type = dbmodels.SmallIntegerField( 1240 choices=control_data.CONTROL_TYPE.choices(), 1241 blank=True, # to allow 0 1242 default=control_data.CONTROL_TYPE.CLIENT) 1243 created_on = dbmodels.DateTimeField() 1244 synch_count = dbmodels.IntegerField(blank=True, default=0) 1245 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1246 run_verify = dbmodels.BooleanField(default=False) 1247 email_list = dbmodels.CharField(max_length=250, blank=True) 1248 dependency_labels = ( 1249 dbmodels.ManyToManyField(Label, blank=True, 1250 db_table='afe_jobs_dependency_labels')) 1251 reboot_before = dbmodels.SmallIntegerField( 1252 choices=model_attributes.RebootBefore.choices(), blank=True, 1253 default=DEFAULT_REBOOT_BEFORE) 1254 reboot_after = dbmodels.SmallIntegerField( 1255 choices=model_attributes.RebootAfter.choices(), blank=True, 1256 default=DEFAULT_REBOOT_AFTER) 1257 parse_failed_repair = dbmodels.BooleanField( 1258 default=DEFAULT_PARSE_FAILED_REPAIR) 1259 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1260 # completed. 1261 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1262 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1263 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1264 1265 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1266 blank=True) 1267 1268 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1269 1270 test_retry = dbmodels.IntegerField(blank=True, default=0) 1271 1272 run_reset = dbmodels.BooleanField(default=True) 1273 1274 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1275 1276 # If this is None on the master, a slave should be found. 1277 # If this is None on a slave, it should be synced back to the master 1278 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1279 1280 # custom manager 1281 objects = JobManager() 1282 1283 1284 @decorators.cached_property 1285 def labels(self): 1286 """All the labels of this job""" 1287 # We need to convert dependency_labels to a list, because all() gives us 1288 # back an iterator, and storing/caching an iterator means we'd only be 1289 # able to read from it once. 1290 return list(self.dependency_labels.all()) 1291 1292 1293 def is_server_job(self): 1294 """Returns whether this job is of type server.""" 1295 return self.control_type == control_data.CONTROL_TYPE.SERVER 1296 1297 1298 @classmethod 1299 def parameterized_jobs_enabled(cls): 1300 """Returns whether parameterized jobs are enabled. 1301 1302 @param cls: Implicit class object. 1303 """ 1304 return global_config.global_config.get_config_value( 1305 'AUTOTEST_WEB', 'parameterized_jobs', type=bool) 1306 1307 1308 @classmethod 1309 def check_parameterized_job(cls, control_file, parameterized_job): 1310 """Checks that the job is valid given the global config settings. 1311 1312 First, either control_file must be set, or parameterized_job must be 1313 set, but not both. Second, parameterized_job must be set if and only if 1314 the parameterized_jobs option in the global config is set to True. 1315 1316 @param cls: Implict class object. 1317 @param control_file: A control file. 1318 @param parameterized_job: A parameterized job. 1319 """ 1320 if not (bool(control_file) ^ bool(parameterized_job)): 1321 raise Exception('Job must have either control file or ' 1322 'parameterization, but not both') 1323 1324 parameterized_jobs_enabled = cls.parameterized_jobs_enabled() 1325 if control_file and parameterized_jobs_enabled: 1326 raise Exception('Control file specified, but parameterized jobs ' 1327 'are enabled') 1328 if parameterized_job and not parameterized_jobs_enabled: 1329 raise Exception('Parameterized job specified, but parameterized ' 1330 'jobs are not enabled') 1331 1332 1333 @classmethod 1334 def create(cls, owner, options, hosts): 1335 """Creates a job. 1336 1337 The job is created by taking some information (the listed args) and 1338 filling in the rest of the necessary information. 1339 1340 @param cls: Implicit class object. 1341 @param owner: The owner for the job. 1342 @param options: An options object. 1343 @param hosts: The hosts to use. 1344 """ 1345 AclGroup.check_for_acl_violation_hosts(hosts) 1346 1347 control_file = options.get('control_file') 1348 parameterized_job = options.get('parameterized_job') 1349 1350 # The current implementation of parameterized jobs requires that only 1351 # control files or parameterized jobs are used. Using the image 1352 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1353 # parameterized jobs and control files jobs, it does muck enough with 1354 # normal jobs by adding a parameterized id to them that this check will 1355 # fail. So for now we just skip this check. 1356 # cls.check_parameterized_job(control_file=control_file, 1357 # parameterized_job=parameterized_job) 1358 user = User.current_user() 1359 if options.get('reboot_before') is None: 1360 options['reboot_before'] = user.get_reboot_before_display() 1361 if options.get('reboot_after') is None: 1362 options['reboot_after'] = user.get_reboot_after_display() 1363 1364 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1365 1366 if options.get('timeout_mins') is None and options.get('timeout'): 1367 options['timeout_mins'] = options['timeout'] * 60 1368 1369 job = cls.add_object( 1370 owner=owner, 1371 name=options['name'], 1372 priority=options['priority'], 1373 control_file=control_file, 1374 control_type=options['control_type'], 1375 synch_count=options.get('synch_count'), 1376 # timeout needs to be deleted in the future. 1377 timeout=options.get('timeout'), 1378 timeout_mins=options.get('timeout_mins'), 1379 max_runtime_mins=options.get('max_runtime_mins'), 1380 run_verify=options.get('run_verify'), 1381 email_list=options.get('email_list'), 1382 reboot_before=options.get('reboot_before'), 1383 reboot_after=options.get('reboot_after'), 1384 parse_failed_repair=options.get('parse_failed_repair'), 1385 created_on=datetime.now(), 1386 drone_set=drone_set, 1387 parameterized_job=parameterized_job, 1388 parent_job=options.get('parent_job_id'), 1389 test_retry=options.get('test_retry'), 1390 run_reset=options.get('run_reset')) 1391 1392 job.dependency_labels = options['dependencies'] 1393 1394 if options.get('keyvals'): 1395 for key, value in options['keyvals'].iteritems(): 1396 JobKeyval.objects.create(job=job, key=key, value=value) 1397 1398 return job 1399 1400 1401 @classmethod 1402 def assign_to_shard(cls, shard, known_ids): 1403 """Assigns unassigned jobs to a shard. 1404 1405 For all labels that have been assigned to this shard, all jobs that 1406 have this label, are assigned to this shard. 1407 1408 Jobs that are assigned to the shard but aren't already present on the 1409 shard are returned. 1410 1411 @param shard: The shard to assign jobs to. 1412 @param known_ids: List of all ids of incomplete jobs, the shard already 1413 knows about. 1414 This is used to figure out which jobs should be sent 1415 to the shard. If shard_ids were used instead, jobs 1416 would only be transferred once, even if the client 1417 failed persisting them. 1418 The number of unfinished jobs usually lies in O(1000). 1419 Assuming one id takes 8 chars in the json, this means 1420 overhead that lies in the lower kilobyte range. 1421 A not in query with 5000 id's takes about 30ms. 1422 1423 @returns The job objects that should be sent to the shard. 1424 """ 1425 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1426 # If this changes or they are triggered manually, this applies: 1427 # Jobs may be returned more than once by concurrent calls of this 1428 # function, as there is a race condition between SELECT and UPDATE. 1429 job_ids = list(Job.objects.filter( 1430 dependency_labels=shard.labels.all() 1431 ).exclude( 1432 id__in=known_ids, 1433 hostqueueentry__aborted=False 1434 ).exclude( 1435 hostqueueentry__complete=True 1436 ).exclude( 1437 hostqueueentry__active=True 1438 ).distinct().values_list('pk', flat=True)) 1439 if job_ids: 1440 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1441 return list(Job.objects.filter(pk__in=job_ids).all()) 1442 return [] 1443 1444 1445 def save(self, *args, **kwargs): 1446 # The current implementation of parameterized jobs requires that only 1447 # control files or parameterized jobs are used. Using the image 1448 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1449 # parameterized jobs and control files jobs, it does muck enough with 1450 # normal jobs by adding a parameterized id to them that this check will 1451 # fail. So for now we just skip this check. 1452 # cls.check_parameterized_job(control_file=self.control_file, 1453 # parameterized_job=self.parameterized_job) 1454 super(Job, self).save(*args, **kwargs) 1455 1456 1457 def queue(self, hosts, atomic_group=None, is_template=False): 1458 """Enqueue a job on the given hosts. 1459 1460 @param hosts: The hosts to use. 1461 @param atomic_group: The associated atomic group. 1462 @param is_template: Whether the status should be "Template". 1463 """ 1464 if not hosts: 1465 if atomic_group: 1466 # No hosts or labels are required to queue an atomic group 1467 # Job. However, if they are given, we respect them below. 1468 atomic_group.enqueue_job(self, is_template=is_template) 1469 else: 1470 # hostless job 1471 entry = HostQueueEntry.create(job=self, is_template=is_template) 1472 entry.save() 1473 return 1474 1475 for host in hosts: 1476 host.enqueue_job(self, atomic_group=atomic_group, 1477 is_template=is_template) 1478 1479 1480 def create_recurring_job(self, start_date, loop_period, loop_count, owner): 1481 """Creates a recurring job. 1482 1483 @param start_date: The starting date of the job. 1484 @param loop_period: How often to re-run the job, in seconds. 1485 @param loop_count: The re-run count. 1486 @param owner: The owner of the job. 1487 """ 1488 rec = RecurringRun(job=self, start_date=start_date, 1489 loop_period=loop_period, 1490 loop_count=loop_count, 1491 owner=User.objects.get(login=owner)) 1492 rec.save() 1493 return rec.id 1494 1495 1496 def user(self): 1497 """Gets the user of this job, or None if it doesn't exist.""" 1498 try: 1499 return User.objects.get(login=self.owner) 1500 except self.DoesNotExist: 1501 return None 1502 1503 1504 def abort(self): 1505 """Aborts this job.""" 1506 for queue_entry in self.hostqueueentry_set.all(): 1507 queue_entry.abort() 1508 1509 1510 def tag(self): 1511 """Returns a string tag for this job.""" 1512 return '%s-%s' % (self.id, self.owner) 1513 1514 1515 def keyval_dict(self): 1516 """Returns all keyvals for this job as a dictionary.""" 1517 return dict((keyval.key, keyval.value) 1518 for keyval in self.jobkeyval_set.all()) 1519 1520 1521 class Meta: 1522 """Metadata for class Job.""" 1523 db_table = 'afe_jobs' 1524 1525 def __unicode__(self): 1526 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1527 1528 1529class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1530 """Keyvals associated with jobs""" 1531 job = dbmodels.ForeignKey(Job) 1532 key = dbmodels.CharField(max_length=90) 1533 value = dbmodels.CharField(max_length=300) 1534 1535 objects = model_logic.ExtendedManager() 1536 1537 class Meta: 1538 """Metadata for class JobKeyval.""" 1539 db_table = 'afe_job_keyvals' 1540 1541 1542class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1543 """Represents an ineligible host queue.""" 1544 job = dbmodels.ForeignKey(Job) 1545 host = dbmodels.ForeignKey(Host) 1546 1547 objects = model_logic.ExtendedManager() 1548 1549 class Meta: 1550 """Metadata for class IneligibleHostQueue.""" 1551 db_table = 'afe_ineligible_host_queues' 1552 1553 1554class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1555 """Represents a host queue entry.""" 1556 1557 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1558 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1559 1560 1561 def custom_deserialize_relation(self, link, data): 1562 assert link == 'meta_host' 1563 self.meta_host = Label.deserialize(data) 1564 1565 1566 def sanity_check_update_from_shard(self, shard, updated_serialized, 1567 job_ids_sent): 1568 if self.job_id not in job_ids_sent: 1569 raise error.UnallowedRecordsSentToMaster( 1570 'Sent HostQueueEntry without corresponding ' 1571 'job entry: %s' % updated_serialized) 1572 1573 1574 Status = host_queue_entry_states.Status 1575 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1576 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1577 1578 job = dbmodels.ForeignKey(Job) 1579 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1580 status = dbmodels.CharField(max_length=255) 1581 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1582 db_column='meta_host') 1583 active = dbmodels.BooleanField(default=False) 1584 complete = dbmodels.BooleanField(default=False) 1585 deleted = dbmodels.BooleanField(default=False) 1586 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1587 default='') 1588 # If atomic_group is set, this is a virtual HostQueueEntry that will 1589 # be expanded into many actual hosts within the group at schedule time. 1590 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1591 aborted = dbmodels.BooleanField(default=False) 1592 started_on = dbmodels.DateTimeField(null=True, blank=True) 1593 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1594 1595 objects = model_logic.ExtendedManager() 1596 1597 1598 def __init__(self, *args, **kwargs): 1599 super(HostQueueEntry, self).__init__(*args, **kwargs) 1600 self._record_attributes(['status']) 1601 1602 1603 @classmethod 1604 def create(cls, job, host=None, meta_host=None, atomic_group=None, 1605 is_template=False): 1606 """Creates a new host queue entry. 1607 1608 @param cls: Implicit class object. 1609 @param job: The associated job. 1610 @param host: The associated host. 1611 @param meta_host: The associated meta host. 1612 @param atomic_group: The associated atomic group. 1613 @param is_template: Whether the status should be "Template". 1614 """ 1615 if is_template: 1616 status = cls.Status.TEMPLATE 1617 else: 1618 status = cls.Status.QUEUED 1619 1620 return cls(job=job, host=host, meta_host=meta_host, 1621 atomic_group=atomic_group, status=status) 1622 1623 1624 def save(self, *args, **kwargs): 1625 self._set_active_and_complete() 1626 super(HostQueueEntry, self).save(*args, **kwargs) 1627 self._check_for_updated_attributes() 1628 1629 1630 def execution_path(self): 1631 """ 1632 Path to this entry's results (relative to the base results directory). 1633 """ 1634 return os.path.join(self.job.tag(), self.execution_subdir) 1635 1636 1637 def host_or_metahost_name(self): 1638 """Returns the first non-None name found in priority order. 1639 1640 The priority order checked is: (1) host name; (2) meta host name; and 1641 (3) atomic group name. 1642 """ 1643 if self.host: 1644 return self.host.hostname 1645 elif self.meta_host: 1646 return self.meta_host.name 1647 else: 1648 assert self.atomic_group, "no host, meta_host or atomic group!" 1649 return self.atomic_group.name 1650 1651 1652 def _set_active_and_complete(self): 1653 if self.status in self.ACTIVE_STATUSES: 1654 self.active, self.complete = True, False 1655 elif self.status in self.COMPLETE_STATUSES: 1656 self.active, self.complete = False, True 1657 else: 1658 self.active, self.complete = False, False 1659 1660 1661 def on_attribute_changed(self, attribute, old_value): 1662 assert attribute == 'status' 1663 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1664 self.status) 1665 1666 1667 def is_meta_host_entry(self): 1668 'True if this is a entry has a meta_host instead of a host.' 1669 return self.host is None and self.meta_host is not None 1670 1671 1672 # This code is shared between rpc_interface and models.HostQueueEntry. 1673 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1674 # a class method was the best way to refactor it. Attempting to put it in 1675 # rpc_utils or a new utils module failed as that would require us to import 1676 # models.py but to call it from here we would have to import the utils.py 1677 # thus creating a cycle. 1678 @classmethod 1679 def abort_host_queue_entries(cls, host_queue_entries): 1680 """Aborts a collection of host_queue_entries. 1681 1682 Abort these host queue entry and all host queue entries of jobs created 1683 by them. 1684 1685 @param host_queue_entries: List of host queue entries we want to abort. 1686 """ 1687 # This isn't completely immune to race conditions since it's not atomic, 1688 # but it should be safe given the scheduler's behavior. 1689 1690 # TODO(milleral): crbug.com/230100 1691 # The |abort_host_queue_entries| rpc does nearly exactly this, 1692 # however, trying to re-use the code generates some horrible 1693 # circular import error. I'd be nice to refactor things around 1694 # sometime so the code could be reused. 1695 1696 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1697 # minimize the total number of database queries: 1698 children = set() 1699 new_children = set(host_queue_entries) 1700 while new_children: 1701 children.update(new_children) 1702 new_child_ids = [hqe.job_id for hqe in new_children] 1703 new_children = HostQueueEntry.objects.filter( 1704 job__parent_job__in=new_child_ids, 1705 complete=False, aborted=False).all() 1706 # To handle circular parental relationships 1707 new_children = set(new_children) - children 1708 1709 # Associate a user with the host queue entries that we're about 1710 # to abort so that we can look up who to blame for the aborts. 1711 now = datetime.now() 1712 user = User.current_user() 1713 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1714 aborted_by=user, aborted_on=now) for hqe in children] 1715 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1716 # Bulk update all of the HQEs to set the abort bit. 1717 child_ids = [hqe.id for hqe in children] 1718 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1719 1720 1721 def abort(self): 1722 """ Aborts this host queue entry. 1723 1724 Abort this host queue entry and all host queue entries of jobs created by 1725 this one. 1726 1727 """ 1728 if not self.complete and not self.aborted: 1729 HostQueueEntry.abort_host_queue_entries([self]) 1730 1731 1732 @classmethod 1733 def compute_full_status(cls, status, aborted, complete): 1734 """Returns a modified status msg if the host queue entry was aborted. 1735 1736 @param cls: Implicit class object. 1737 @param status: The original status message. 1738 @param aborted: Whether the host queue entry was aborted. 1739 @param complete: Whether the host queue entry was completed. 1740 """ 1741 if aborted and not complete: 1742 return 'Aborted (%s)' % status 1743 return status 1744 1745 1746 def full_status(self): 1747 """Returns the full status of this host queue entry, as a string.""" 1748 return self.compute_full_status(self.status, self.aborted, 1749 self.complete) 1750 1751 1752 def _postprocess_object_dict(self, object_dict): 1753 object_dict['full_status'] = self.full_status() 1754 1755 1756 class Meta: 1757 """Metadata for class HostQueueEntry.""" 1758 db_table = 'afe_host_queue_entries' 1759 1760 1761 1762 def __unicode__(self): 1763 hostname = None 1764 if self.host: 1765 hostname = self.host.hostname 1766 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1767 1768 1769class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1770 """Represents an aborted host queue entry.""" 1771 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1772 aborted_by = dbmodels.ForeignKey(User) 1773 aborted_on = dbmodels.DateTimeField() 1774 1775 objects = model_logic.ExtendedManager() 1776 1777 1778 def save(self, *args, **kwargs): 1779 self.aborted_on = datetime.now() 1780 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1781 1782 class Meta: 1783 """Metadata for class AbortedHostQueueEntry.""" 1784 db_table = 'afe_aborted_host_queue_entries' 1785 1786 1787class RecurringRun(dbmodels.Model, model_logic.ModelExtensions): 1788 """\ 1789 job: job to use as a template 1790 owner: owner of the instantiated template 1791 start_date: Run the job at scheduled date 1792 loop_period: Re-run (loop) the job periodically 1793 (in every loop_period seconds) 1794 loop_count: Re-run (loop) count 1795 """ 1796 1797 job = dbmodels.ForeignKey(Job) 1798 owner = dbmodels.ForeignKey(User) 1799 start_date = dbmodels.DateTimeField() 1800 loop_period = dbmodels.IntegerField(blank=True) 1801 loop_count = dbmodels.IntegerField(blank=True) 1802 1803 objects = model_logic.ExtendedManager() 1804 1805 class Meta: 1806 """Metadata for class RecurringRun.""" 1807 db_table = 'afe_recurring_run' 1808 1809 def __unicode__(self): 1810 return u'RecurringRun(job %s, start %s, period %s, count %s)' % ( 1811 self.job.id, self.start_date, self.loop_period, self.loop_count) 1812 1813 1814class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 1815 """\ 1816 Tasks to run on hosts at the next time they are in the Ready state. Use this 1817 for high-priority tasks, such as forced repair or forced reinstall. 1818 1819 host: host to run this task on 1820 task: special task to run 1821 time_requested: date and time the request for this task was made 1822 is_active: task is currently running 1823 is_complete: task has finished running 1824 is_aborted: task was aborted 1825 time_started: date and time the task started 1826 time_finished: date and time the task finished 1827 queue_entry: Host queue entry waiting on this task (or None, if task was not 1828 started in preparation of a job) 1829 """ 1830 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 1831 string_values=True) 1832 1833 host = dbmodels.ForeignKey(Host, blank=False, null=False) 1834 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 1835 blank=False, null=False) 1836 requested_by = dbmodels.ForeignKey(User) 1837 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1838 null=False) 1839 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1840 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1841 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 1842 time_started = dbmodels.DateTimeField(null=True, blank=True) 1843 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1844 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1845 time_finished = dbmodels.DateTimeField(null=True, blank=True) 1846 1847 objects = model_logic.ExtendedManager() 1848 1849 1850 def save(self, **kwargs): 1851 if self.queue_entry: 1852 self.requested_by = User.objects.get( 1853 login=self.queue_entry.job.owner) 1854 super(SpecialTask, self).save(**kwargs) 1855 1856 1857 def execution_path(self): 1858 """Get the execution path of the SpecialTask. 1859 1860 This method returns different paths depending on where a 1861 the task ran: 1862 * Master: hosts/hostname/task_id-task_type 1863 * Shard: Master_path/time_created 1864 This is to work around the fact that a shard can fail independent 1865 of the master, and be replaced by another shard that has the same 1866 hosts. Without the time_created stamp the logs of the tasks running 1867 on the second shard will clobber the logs from the first in google 1868 storage, because task ids are not globally unique. 1869 1870 @return: An execution path for the task. 1871 """ 1872 results_path = 'hosts/%s/%s-%s' % (self.host.hostname, self.id, 1873 self.task.lower()) 1874 1875 # If we do this on the master it will break backward compatibility, 1876 # as there are tasks that currently don't have timestamps. If a host 1877 # or job has been sent to a shard, the rpc for that host/job will 1878 # be redirected to the shard, so this global_config check will happen 1879 # on the shard the logs are on. 1880 is_shard = global_config.global_config.get_config_value( 1881 'SHARD', 'shard_hostname', type=str, default='') 1882 if not is_shard: 1883 return results_path 1884 1885 # Generate a uid to disambiguate special task result directories 1886 # in case this shard fails. The simplest uid is the job_id, however 1887 # in rare cases tasks do not have jobs associated with them (eg: 1888 # frontend verify), so just use the creation timestamp. The clocks 1889 # between a shard and master should always be in sync. Any discrepancies 1890 # will be brought to our attention in the form of job timeouts. 1891 uid = self.time_requested.strftime('%Y%d%m%H%M%S') 1892 1893 # TODO: This is a hack, however it is the easiest way to achieve 1894 # correctness. There is currently some debate over the future of 1895 # tasks in our infrastructure and refactoring everything right 1896 # now isn't worth the time. 1897 return '%s/%s' % (results_path, uid) 1898 1899 1900 # property to emulate HostQueueEntry.status 1901 @property 1902 def status(self): 1903 """ 1904 Return a host queue entry status appropriate for this task. Although 1905 SpecialTasks are not HostQueueEntries, it is helpful to the user to 1906 present similar statuses. 1907 """ 1908 if self.is_complete: 1909 if self.success: 1910 return HostQueueEntry.Status.COMPLETED 1911 return HostQueueEntry.Status.FAILED 1912 if self.is_active: 1913 return HostQueueEntry.Status.RUNNING 1914 return HostQueueEntry.Status.QUEUED 1915 1916 1917 # property to emulate HostQueueEntry.started_on 1918 @property 1919 def started_on(self): 1920 """Returns the time at which this special task started.""" 1921 return self.time_started 1922 1923 1924 @classmethod 1925 def schedule_special_task(cls, host, task): 1926 """Schedules a special task on a host if not already scheduled. 1927 1928 @param cls: Implicit class object. 1929 @param host: The host to use. 1930 @param task: The task to schedule. 1931 """ 1932 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 1933 is_active=False, 1934 is_complete=False) 1935 if existing_tasks: 1936 return existing_tasks[0] 1937 1938 special_task = SpecialTask(host=host, task=task, 1939 requested_by=User.current_user()) 1940 special_task.save() 1941 return special_task 1942 1943 1944 def abort(self): 1945 """ Abort this special task.""" 1946 self.is_aborted = True 1947 self.save() 1948 1949 1950 def activate(self): 1951 """ 1952 Sets a task as active and sets the time started to the current time. 1953 """ 1954 logging.info('Starting: %s', self) 1955 self.is_active = True 1956 self.time_started = datetime.now() 1957 self.save() 1958 1959 1960 def finish(self, success): 1961 """Sets a task as completed. 1962 1963 @param success: Whether or not the task was successful. 1964 """ 1965 logging.info('Finished: %s', self) 1966 self.is_active = False 1967 self.is_complete = True 1968 self.success = success 1969 if self.time_started: 1970 self.time_finished = datetime.now() 1971 self.save() 1972 1973 1974 class Meta: 1975 """Metadata for class SpecialTask.""" 1976 db_table = 'afe_special_tasks' 1977 1978 1979 def __unicode__(self): 1980 result = u'Special Task %s (host %s, task %s, time %s)' % ( 1981 self.id, self.host, self.task, self.time_requested) 1982 if self.is_complete: 1983 result += u' (completed)' 1984 elif self.is_active: 1985 result += u' (active)' 1986 1987 return result 1988 1989 1990class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 1991 1992 board = dbmodels.CharField(max_length=255, unique=True) 1993 version = dbmodels.CharField(max_length=255) 1994 1995 class Meta: 1996 """Metadata for class StableVersion.""" 1997 db_table = 'afe_stable_versions'