models.py revision 06a4b52b21f663027da8a2b64c552b63224896d0
1# pylint: disable-msg=C0111 2 3import logging, os 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23from autotest_lib.client.common_lib import site_utils 24from autotest_lib.client.common_lib.cros.graphite import autotest_es 25 26# job options and user preferences 27DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 28DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 29 30 31class AclAccessViolation(Exception): 32 """\ 33 Raised when an operation is attempted with proper permissions as 34 dictated by ACLs. 35 """ 36 37 38class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 39 """\ 40 An atomic group defines a collection of hosts which must only be scheduled 41 all at once. Any host with a label having an atomic group will only be 42 scheduled for a job at the same time as other hosts sharing that label. 43 44 Required: 45 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 46 max_number_of_machines: The maximum number of machines that will be 47 scheduled at once when scheduling jobs to this atomic group. 48 The job.synch_count is considered the minimum. 49 50 Optional: 51 description: Arbitrary text description of this group's purpose. 52 """ 53 name = dbmodels.CharField(max_length=255, unique=True) 54 description = dbmodels.TextField(blank=True) 55 # This magic value is the default to simplify the scheduler logic. 56 # It must be "large". The common use of atomic groups is to want all 57 # machines in the group to be used, limits on which subset used are 58 # often chosen via dependency labels. 59 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 60 # "infinity" is around 3.3 million. 61 INFINITE_MACHINES = 333333333 62 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 63 invalid = dbmodels.BooleanField(default=False, 64 editable=settings.FULL_ADMIN) 65 66 name_field = 'name' 67 objects = model_logic.ModelWithInvalidManager() 68 valid_objects = model_logic.ValidObjectsManager() 69 70 71 def enqueue_job(self, job, is_template=False): 72 """Enqueue a job on an associated atomic group of hosts. 73 74 @param job: A job to enqueue. 75 @param is_template: Whether the status should be "Template". 76 """ 77 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 78 is_template=is_template) 79 queue_entry.save() 80 81 82 def clean_object(self): 83 self.label_set.clear() 84 85 86 class Meta: 87 """Metadata for class AtomicGroup.""" 88 db_table = 'afe_atomic_groups' 89 90 91 def __unicode__(self): 92 return unicode(self.name) 93 94 95class Label(model_logic.ModelWithInvalid, dbmodels.Model): 96 """\ 97 Required: 98 name: label name 99 100 Optional: 101 kernel_config: URL/path to kernel config for jobs run on this label. 102 platform: If True, this is a platform label (defaults to False). 103 only_if_needed: If True, a Host with this label can only be used if that 104 label is requested by the job/test (either as the meta_host or 105 in the job_dependencies). 106 atomic_group: The atomic group associated with this label. 107 """ 108 name = dbmodels.CharField(max_length=255, unique=True) 109 kernel_config = dbmodels.CharField(max_length=255, blank=True) 110 platform = dbmodels.BooleanField(default=False) 111 invalid = dbmodels.BooleanField(default=False, 112 editable=settings.FULL_ADMIN) 113 only_if_needed = dbmodels.BooleanField(default=False) 114 115 name_field = 'name' 116 objects = model_logic.ModelWithInvalidManager() 117 valid_objects = model_logic.ValidObjectsManager() 118 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 119 120 121 def clean_object(self): 122 self.host_set.clear() 123 self.test_set.clear() 124 125 126 def enqueue_job(self, job, atomic_group=None, is_template=False): 127 """Enqueue a job on any host of this label. 128 129 @param job: A job to enqueue. 130 @param atomic_group: The associated atomic group. 131 @param is_template: Whether the status should be "Template". 132 """ 133 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 134 is_template=is_template, 135 atomic_group=atomic_group) 136 queue_entry.save() 137 138 139 140 class Meta: 141 """Metadata for class Label.""" 142 db_table = 'afe_labels' 143 144 145 def __unicode__(self): 146 return unicode(self.name) 147 148 149class Shard(dbmodels.Model, model_logic.ModelExtensions): 150 151 hostname = dbmodels.CharField(max_length=255, unique=True) 152 153 name_field = 'hostname' 154 155 labels = dbmodels.ManyToManyField(Label, blank=True, 156 db_table='afe_shards_labels') 157 158 class Meta: 159 """Metadata for class ParameterizedJob.""" 160 db_table = 'afe_shards' 161 162 163 def rpc_hostname(self): 164 """Get the rpc hostname of the shard. 165 166 @return: Just the shard hostname for all non-testing environments. 167 The address of the default gateway for vm testing environments. 168 """ 169 # TODO: Figure out a better solution for testing. Since no 2 shards 170 # can run on the same host, if the shard hostname is localhost we 171 # conclude that it must be a vm in a test cluster. In such situations 172 # a name of localhost:<port> is necessary to achieve the correct 173 # afe links/redirection from the frontend (this happens through the 174 # host), but for rpcs that are performed *on* the shard, they need to 175 # use the address of the gateway. 176 hostname = self.hostname.split(':')[0] 177 if site_utils.is_localhost(hostname): 178 return self.hostname.replace( 179 hostname, site_utils.DEFAULT_VM_GATEWAY) 180 return self.hostname 181 182 183class Drone(dbmodels.Model, model_logic.ModelExtensions): 184 """ 185 A scheduler drone 186 187 hostname: the drone's hostname 188 """ 189 hostname = dbmodels.CharField(max_length=255, unique=True) 190 191 name_field = 'hostname' 192 objects = model_logic.ExtendedManager() 193 194 195 def save(self, *args, **kwargs): 196 if not User.current_user().is_superuser(): 197 raise Exception('Only superusers may edit drones') 198 super(Drone, self).save(*args, **kwargs) 199 200 201 def delete(self): 202 if not User.current_user().is_superuser(): 203 raise Exception('Only superusers may delete drones') 204 super(Drone, self).delete() 205 206 207 class Meta: 208 """Metadata for class Drone.""" 209 db_table = 'afe_drones' 210 211 def __unicode__(self): 212 return unicode(self.hostname) 213 214 215class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 216 """ 217 A set of scheduler drones 218 219 These will be used by the scheduler to decide what drones a job is allowed 220 to run on. 221 222 name: the drone set's name 223 drones: the drones that are part of the set 224 """ 225 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 226 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 227 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 228 'SCHEDULER', 'default_drone_set_name', default=None) 229 230 name = dbmodels.CharField(max_length=255, unique=True) 231 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 232 233 name_field = 'name' 234 objects = model_logic.ExtendedManager() 235 236 237 def save(self, *args, **kwargs): 238 if not User.current_user().is_superuser(): 239 raise Exception('Only superusers may edit drone sets') 240 super(DroneSet, self).save(*args, **kwargs) 241 242 243 def delete(self): 244 if not User.current_user().is_superuser(): 245 raise Exception('Only superusers may delete drone sets') 246 super(DroneSet, self).delete() 247 248 249 @classmethod 250 def drone_sets_enabled(cls): 251 """Returns whether drone sets are enabled. 252 253 @param cls: Implicit class object. 254 """ 255 return cls.DRONE_SETS_ENABLED 256 257 258 @classmethod 259 def default_drone_set_name(cls): 260 """Returns the default drone set name. 261 262 @param cls: Implicit class object. 263 """ 264 return cls.DEFAULT_DRONE_SET_NAME 265 266 267 @classmethod 268 def get_default(cls): 269 """Gets the default drone set name, compatible with Job.add_object. 270 271 @param cls: Implicit class object. 272 """ 273 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 274 275 276 @classmethod 277 def resolve_name(cls, drone_set_name): 278 """ 279 Returns the name of one of these, if not None, in order of preference: 280 1) the drone set given, 281 2) the current user's default drone set, or 282 3) the global default drone set 283 284 or returns None if drone sets are disabled 285 286 @param cls: Implicit class object. 287 @param drone_set_name: A drone set name. 288 """ 289 if not cls.drone_sets_enabled(): 290 return None 291 292 user = User.current_user() 293 user_drone_set_name = user.drone_set and user.drone_set.name 294 295 return drone_set_name or user_drone_set_name or cls.get_default().name 296 297 298 def get_drone_hostnames(self): 299 """ 300 Gets the hostnames of all drones in this drone set 301 """ 302 return set(self.drones.all().values_list('hostname', flat=True)) 303 304 305 class Meta: 306 """Metadata for class DroneSet.""" 307 db_table = 'afe_drone_sets' 308 309 def __unicode__(self): 310 return unicode(self.name) 311 312 313class User(dbmodels.Model, model_logic.ModelExtensions): 314 """\ 315 Required: 316 login :user login name 317 318 Optional: 319 access_level: 0=User (default), 1=Admin, 100=Root 320 """ 321 ACCESS_ROOT = 100 322 ACCESS_ADMIN = 1 323 ACCESS_USER = 0 324 325 AUTOTEST_SYSTEM = 'autotest_system' 326 327 login = dbmodels.CharField(max_length=255, unique=True) 328 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 329 330 # user preferences 331 reboot_before = dbmodels.SmallIntegerField( 332 choices=model_attributes.RebootBefore.choices(), blank=True, 333 default=DEFAULT_REBOOT_BEFORE) 334 reboot_after = dbmodels.SmallIntegerField( 335 choices=model_attributes.RebootAfter.choices(), blank=True, 336 default=DEFAULT_REBOOT_AFTER) 337 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 338 show_experimental = dbmodels.BooleanField(default=False) 339 340 name_field = 'login' 341 objects = model_logic.ExtendedManager() 342 343 344 def save(self, *args, **kwargs): 345 # is this a new object being saved for the first time? 346 first_time = (self.id is None) 347 user = thread_local.get_user() 348 if user and not user.is_superuser() and user.login != self.login: 349 raise AclAccessViolation("You cannot modify user " + self.login) 350 super(User, self).save(*args, **kwargs) 351 if first_time: 352 everyone = AclGroup.objects.get(name='Everyone') 353 everyone.users.add(self) 354 355 356 def is_superuser(self): 357 """Returns whether the user has superuser access.""" 358 return self.access_level >= self.ACCESS_ROOT 359 360 361 @classmethod 362 def current_user(cls): 363 """Returns the current user. 364 365 @param cls: Implicit class object. 366 """ 367 user = thread_local.get_user() 368 if user is None: 369 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 370 user.access_level = cls.ACCESS_ROOT 371 user.save() 372 return user 373 374 375 @classmethod 376 def get_record(cls, data): 377 """Check the database for an identical record. 378 379 Check for a record with matching id and login. If one exists, 380 return it. If one does not exist there is a possibility that 381 the following cases have happened: 382 1. Same id, different login 383 We received: "1 chromeos-test" 384 And we have: "1 debug-user" 385 In this case we need to delete "1 debug_user" and insert 386 "1 chromeos-test". 387 388 2. Same login, different id: 389 We received: "1 chromeos-test" 390 And we have: "2 chromeos-test" 391 In this case we need to delete "2 chromeos-test" and insert 392 "1 chromeos-test". 393 394 As long as this method deletes bad records and raises the 395 DoesNotExist exception the caller will handle creating the 396 new record. 397 398 @raises: DoesNotExist, if a record with the matching login and id 399 does not exist. 400 """ 401 402 # Both the id and login should be uniqe but there are cases when 403 # we might already have a user with the same login/id because 404 # current_user will proactively create a user record if it doesn't 405 # exist. Since we want to avoid conflict between the master and 406 # shard, just delete any existing user records that don't match 407 # what we're about to deserialize from the master. 408 try: 409 return cls.objects.get(login=data['login'], id=data['id']) 410 except cls.DoesNotExist: 411 cls.delete_matching_record(login=data['login']) 412 cls.delete_matching_record(id=data['id']) 413 raise 414 415 416 class Meta: 417 """Metadata for class User.""" 418 db_table = 'afe_users' 419 420 def __unicode__(self): 421 return unicode(self.login) 422 423 424class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 425 model_logic.ModelWithAttributes): 426 """\ 427 Required: 428 hostname 429 430 optional: 431 locked: if true, host is locked and will not be queued 432 433 Internal: 434 From AbstractHostModel: 435 synch_id: currently unused 436 status: string describing status of host 437 invalid: true if the host has been deleted 438 protection: indicates what can be done to this host during repair 439 lock_time: DateTime at which the host was locked 440 dirty: true if the host has been used without being rebooted 441 Local: 442 locked_by: user that locked the host, or null if the host is unlocked 443 """ 444 445 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 446 'hostattribute_set', 447 'labels', 448 'shard']) 449 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid']) 450 451 452 def custom_deserialize_relation(self, link, data): 453 assert link == 'shard', 'Link %s should not be deserialized' % link 454 self.shard = Shard.deserialize(data) 455 456 457 # Note: Only specify foreign keys here, specify all native host columns in 458 # rdb_model_extensions instead. 459 Protection = host_protections.Protection 460 labels = dbmodels.ManyToManyField(Label, blank=True, 461 db_table='afe_hosts_labels') 462 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 463 name_field = 'hostname' 464 objects = model_logic.ModelWithInvalidManager() 465 valid_objects = model_logic.ValidObjectsManager() 466 leased_objects = model_logic.LeasedHostManager() 467 468 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 469 470 def __init__(self, *args, **kwargs): 471 super(Host, self).__init__(*args, **kwargs) 472 self._record_attributes(['status']) 473 474 475 @staticmethod 476 def create_one_time_host(hostname): 477 """Creates a one-time host. 478 479 @param hostname: The name for the host. 480 """ 481 query = Host.objects.filter(hostname=hostname) 482 if query.count() == 0: 483 host = Host(hostname=hostname, invalid=True) 484 host.do_validate() 485 else: 486 host = query[0] 487 if not host.invalid: 488 raise model_logic.ValidationError({ 489 'hostname' : '%s already exists in the autotest DB. ' 490 'Select it rather than entering it as a one time ' 491 'host.' % hostname 492 }) 493 host.protection = host_protections.Protection.DO_NOT_REPAIR 494 host.locked = False 495 host.save() 496 host.clean_object() 497 return host 498 499 500 @classmethod 501 def assign_to_shard(cls, shard, known_ids): 502 """Assigns hosts to a shard. 503 504 For all labels that have been assigned to this shard, all hosts that 505 have this label, are assigned to this shard. 506 507 Hosts that are assigned to the shard but aren't already present on the 508 shard are returned. 509 510 @param shard: The shard object to assign labels/hosts for. 511 @param known_ids: List of all host-ids the shard already knows. 512 This is used to figure out which hosts should be sent 513 to the shard. If shard_ids were used instead, hosts 514 would only be transferred once, even if the client 515 failed persisting them. 516 The number of hosts usually lies in O(100), so the 517 overhead is acceptable. 518 519 @returns the hosts objects that should be sent to the shard. 520 """ 521 522 # Disclaimer: concurrent heartbeats should theoretically not occur in 523 # the current setup. As they may be introduced in the near future, 524 # this comment will be left here. 525 526 # Sending stuff twice is acceptable, but forgetting something isn't. 527 # Detecting duplicates on the client is easy, but here it's harder. The 528 # following options were considered: 529 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 530 # than select returned, as concurrently more hosts might have been 531 # inserted 532 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 533 # hosts for the shard, this is overhead 534 # - SELECT and then UPDATE only selected without requerying afterwards: 535 # returns the old state of the records. 536 host_ids = list(Host.objects.filter( 537 labels=shard.labels.all(), 538 leased=False 539 ).exclude( 540 id__in=known_ids, 541 ).values_list('pk', flat=True)) 542 543 if host_ids: 544 Host.objects.filter(pk__in=host_ids).update(shard=shard) 545 return list(Host.objects.filter(pk__in=host_ids).all()) 546 return [] 547 548 def resurrect_object(self, old_object): 549 super(Host, self).resurrect_object(old_object) 550 # invalid hosts can be in use by the scheduler (as one-time hosts), so 551 # don't change the status 552 self.status = old_object.status 553 554 555 def clean_object(self): 556 self.aclgroup_set.clear() 557 self.labels.clear() 558 559 560 def record_state(self, type_str, state, value, other_metadata=None): 561 """Record metadata in elasticsearch. 562 563 @param type_str: sets the _type field in elasticsearch db. 564 @param state: string representing what state we are recording, 565 e.g. 'locked' 566 @param value: value of the state, e.g. True 567 @param other_metadata: Other metadata to store in metaDB. 568 """ 569 metadata = { 570 state: value, 571 'hostname': self.hostname, 572 } 573 if other_metadata: 574 metadata = dict(metadata.items() + other_metadata.items()) 575 autotest_es.post(type_str=type_str, metadata=metadata) 576 577 578 def save(self, *args, **kwargs): 579 # extra spaces in the hostname can be a sneaky source of errors 580 self.hostname = self.hostname.strip() 581 # is this a new object being saved for the first time? 582 first_time = (self.id is None) 583 if not first_time: 584 AclGroup.check_for_acl_violation_hosts([self]) 585 # If locked is changed, send its status and user made the change to 586 # metaDB. Locks are important in host history because if a device is 587 # locked then we don't really care what state it is in. 588 if self.locked and not self.locked_by: 589 self.locked_by = User.current_user() 590 self.lock_time = datetime.now() 591 self.record_state('lock_history', 'locked', self.locked, 592 {'changed_by': self.locked_by.login}) 593 self.dirty = True 594 elif not self.locked and self.locked_by: 595 self.record_state('lock_history', 'locked', self.locked, 596 {'changed_by': self.locked_by.login}) 597 self.locked_by = None 598 self.lock_time = None 599 super(Host, self).save(*args, **kwargs) 600 if first_time: 601 everyone = AclGroup.objects.get(name='Everyone') 602 everyone.hosts.add(self) 603 self._check_for_updated_attributes() 604 605 606 def delete(self): 607 AclGroup.check_for_acl_violation_hosts([self]) 608 for queue_entry in self.hostqueueentry_set.all(): 609 queue_entry.deleted = True 610 queue_entry.abort() 611 super(Host, self).delete() 612 613 614 def on_attribute_changed(self, attribute, old_value): 615 assert attribute == 'status' 616 logging.info(self.hostname + ' -> ' + self.status) 617 618 619 def enqueue_job(self, job, atomic_group=None, is_template=False): 620 """Enqueue a job on this host. 621 622 @param job: A job to enqueue. 623 @param atomic_group: The associated atomic group. 624 @param is_template: Whther the status should be "Template". 625 """ 626 queue_entry = HostQueueEntry.create(host=self, job=job, 627 is_template=is_template, 628 atomic_group=atomic_group) 629 # allow recovery of dead hosts from the frontend 630 if not self.active_queue_entry() and self.is_dead(): 631 self.status = Host.Status.READY 632 self.save() 633 queue_entry.save() 634 635 block = IneligibleHostQueue(job=job, host=self) 636 block.save() 637 638 639 def platform(self): 640 """The platform of the host.""" 641 # TODO(showard): slighly hacky? 642 platforms = self.labels.filter(platform=True) 643 if len(platforms) == 0: 644 return None 645 return platforms[0] 646 platform.short_description = 'Platform' 647 648 649 @classmethod 650 def check_no_platform(cls, hosts): 651 """Verify the specified hosts have no associated platforms. 652 653 @param cls: Implicit class object. 654 @param hosts: The hosts to verify. 655 @raises model_logic.ValidationError if any hosts already have a 656 platform. 657 """ 658 Host.objects.populate_relationships(hosts, Label, 'label_list') 659 errors = [] 660 for host in hosts: 661 platforms = [label.name for label in host.label_list 662 if label.platform] 663 if platforms: 664 # do a join, just in case this host has multiple platforms, 665 # we'll be able to see it 666 errors.append('Host %s already has a platform: %s' % ( 667 host.hostname, ', '.join(platforms))) 668 if errors: 669 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 670 671 672 def is_dead(self): 673 """Returns whether the host is dead (has status repair failed).""" 674 return self.status == Host.Status.REPAIR_FAILED 675 676 677 def active_queue_entry(self): 678 """Returns the active queue entry for this host, or None if none.""" 679 active = list(self.hostqueueentry_set.filter(active=True)) 680 if not active: 681 return None 682 assert len(active) == 1, ('More than one active entry for ' 683 'host ' + self.hostname) 684 return active[0] 685 686 687 def _get_attribute_model_and_args(self, attribute): 688 return HostAttribute, dict(host=self, attribute=attribute) 689 690 691 @classmethod 692 def get_attribute_model(cls): 693 """Return the attribute model. 694 695 Override method in parent class. See ModelExtensions for details. 696 @returns: The attribute model of Host. 697 """ 698 return HostAttribute 699 700 701 class Meta: 702 """Metadata for the Host class.""" 703 db_table = 'afe_hosts' 704 705 706 def __unicode__(self): 707 return unicode(self.hostname) 708 709 710class HostAttribute(dbmodels.Model, model_logic.ModelExtensions): 711 """Arbitrary keyvals associated with hosts.""" 712 713 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 714 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 715 host = dbmodels.ForeignKey(Host) 716 attribute = dbmodels.CharField(max_length=90) 717 value = dbmodels.CharField(max_length=300) 718 719 objects = model_logic.ExtendedManager() 720 721 class Meta: 722 """Metadata for the HostAttribute class.""" 723 db_table = 'afe_host_attributes' 724 725 726 @classmethod 727 def get_record(cls, data): 728 """Check the database for an identical record. 729 730 Use host_id and attribute to search for a existing record. 731 732 @raises: DoesNotExist, if no record found 733 @raises: MultipleObjectsReturned if multiple records found. 734 """ 735 # TODO(fdeng): We should use host_id and attribute together as 736 # a primary key in the db. 737 return cls.objects.get(host_id=data['host_id'], 738 attribute=data['attribute']) 739 740 741 @classmethod 742 def deserialize(cls, data): 743 """Override deserialize in parent class. 744 745 Do not deserialize id as id is not kept consistent on master and shards. 746 747 @param data: A dictionary of data to deserialize. 748 749 @returns: A HostAttribute object. 750 """ 751 if data: 752 data.pop('id') 753 return super(HostAttribute, cls).deserialize(data) 754 755 756class Test(dbmodels.Model, model_logic.ModelExtensions): 757 """\ 758 Required: 759 author: author name 760 description: description of the test 761 name: test name 762 time: short, medium, long 763 test_class: This describes the class for your the test belongs in. 764 test_category: This describes the category for your tests 765 test_type: Client or Server 766 path: path to pass to run_test() 767 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 768 async job. If it's >1 it's sync job for that number of machines 769 i.e. if sync_count = 2 it is a sync job that requires two 770 machines. 771 Optional: 772 dependencies: What the test requires to run. Comma deliminated list 773 dependency_labels: many-to-many relationship with labels corresponding to 774 test dependencies. 775 experimental: If this is set to True production servers will ignore the test 776 run_verify: Whether or not the scheduler should run the verify stage 777 run_reset: Whether or not the scheduler should run the reset stage 778 test_retry: Number of times to retry test if the test did not complete 779 successfully. (optional, default: 0) 780 """ 781 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 782 783 name = dbmodels.CharField(max_length=255, unique=True) 784 author = dbmodels.CharField(max_length=255) 785 test_class = dbmodels.CharField(max_length=255) 786 test_category = dbmodels.CharField(max_length=255) 787 dependencies = dbmodels.CharField(max_length=255, blank=True) 788 description = dbmodels.TextField(blank=True) 789 experimental = dbmodels.BooleanField(default=True) 790 run_verify = dbmodels.BooleanField(default=False) 791 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 792 default=TestTime.MEDIUM) 793 test_type = dbmodels.SmallIntegerField( 794 choices=control_data.CONTROL_TYPE.choices()) 795 sync_count = dbmodels.IntegerField(default=1) 796 path = dbmodels.CharField(max_length=255, unique=True) 797 test_retry = dbmodels.IntegerField(blank=True, default=0) 798 run_reset = dbmodels.BooleanField(default=True) 799 800 dependency_labels = ( 801 dbmodels.ManyToManyField(Label, blank=True, 802 db_table='afe_autotests_dependency_labels')) 803 name_field = 'name' 804 objects = model_logic.ExtendedManager() 805 806 807 def admin_description(self): 808 """Returns a string representing the admin description.""" 809 escaped_description = saxutils.escape(self.description) 810 return '<span style="white-space:pre">%s</span>' % escaped_description 811 admin_description.allow_tags = True 812 admin_description.short_description = 'Description' 813 814 815 class Meta: 816 """Metadata for class Test.""" 817 db_table = 'afe_autotests' 818 819 def __unicode__(self): 820 return unicode(self.name) 821 822 823class TestParameter(dbmodels.Model): 824 """ 825 A declared parameter of a test 826 """ 827 test = dbmodels.ForeignKey(Test) 828 name = dbmodels.CharField(max_length=255) 829 830 class Meta: 831 """Metadata for class TestParameter.""" 832 db_table = 'afe_test_parameters' 833 unique_together = ('test', 'name') 834 835 def __unicode__(self): 836 return u'%s (%s)' % (self.name, self.test.name) 837 838 839class Profiler(dbmodels.Model, model_logic.ModelExtensions): 840 """\ 841 Required: 842 name: profiler name 843 test_type: Client or Server 844 845 Optional: 846 description: arbirary text description 847 """ 848 name = dbmodels.CharField(max_length=255, unique=True) 849 description = dbmodels.TextField(blank=True) 850 851 name_field = 'name' 852 objects = model_logic.ExtendedManager() 853 854 855 class Meta: 856 """Metadata for class Profiler.""" 857 db_table = 'afe_profilers' 858 859 def __unicode__(self): 860 return unicode(self.name) 861 862 863class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 864 """\ 865 Required: 866 name: name of ACL group 867 868 Optional: 869 description: arbitrary description of group 870 """ 871 872 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 873 874 name = dbmodels.CharField(max_length=255, unique=True) 875 description = dbmodels.CharField(max_length=255, blank=True) 876 users = dbmodels.ManyToManyField(User, blank=False, 877 db_table='afe_acl_groups_users') 878 hosts = dbmodels.ManyToManyField(Host, blank=True, 879 db_table='afe_acl_groups_hosts') 880 881 name_field = 'name' 882 objects = model_logic.ExtendedManager() 883 884 @staticmethod 885 def check_for_acl_violation_hosts(hosts): 886 """Verify the current user has access to the specified hosts. 887 888 @param hosts: The hosts to verify against. 889 @raises AclAccessViolation if the current user doesn't have access 890 to a host. 891 """ 892 user = User.current_user() 893 if user.is_superuser(): 894 return 895 accessible_host_ids = set( 896 host.id for host in Host.objects.filter(aclgroup__users=user)) 897 for host in hosts: 898 # Check if the user has access to this host, 899 # but only if it is not a metahost or a one-time-host. 900 no_access = (isinstance(host, Host) 901 and not host.invalid 902 and int(host.id) not in accessible_host_ids) 903 if no_access: 904 raise AclAccessViolation("%s does not have access to %s" % 905 (str(user), str(host))) 906 907 908 @staticmethod 909 def check_abort_permissions(queue_entries): 910 """Look for queue entries that aren't abortable by the current user. 911 912 An entry is not abortable if: 913 * the job isn't owned by this user, and 914 * the machine isn't ACL-accessible, or 915 * the machine is in the "Everyone" ACL 916 917 @param queue_entries: The queue entries to check. 918 @raises AclAccessViolation if a queue entry is not abortable by the 919 current user. 920 """ 921 user = User.current_user() 922 if user.is_superuser(): 923 return 924 not_owned = queue_entries.exclude(job__owner=user.login) 925 # I do this using ID sets instead of just Django filters because 926 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 927 # 1.0. 928 # TODO: Use Django filters, now that we're using 1.0. 929 accessible_ids = set( 930 entry.id for entry 931 in not_owned.filter(host__aclgroup__users__login=user.login)) 932 public_ids = set(entry.id for entry 933 in not_owned.filter(host__aclgroup__name='Everyone')) 934 cannot_abort = [entry for entry in not_owned.select_related() 935 if entry.id not in accessible_ids 936 or entry.id in public_ids] 937 if len(cannot_abort) == 0: 938 return 939 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 940 entry.host_or_metahost_name()) 941 for entry in cannot_abort) 942 raise AclAccessViolation('You cannot abort the following job entries: ' 943 + entry_names) 944 945 946 def check_for_acl_violation_acl_group(self): 947 """Verifies the current user has acces to this ACL group. 948 949 @raises AclAccessViolation if the current user doesn't have access to 950 this ACL group. 951 """ 952 user = User.current_user() 953 if user.is_superuser(): 954 return 955 if self.name == 'Everyone': 956 raise AclAccessViolation("You cannot modify 'Everyone'!") 957 if not user in self.users.all(): 958 raise AclAccessViolation("You do not have access to %s" 959 % self.name) 960 961 @staticmethod 962 def on_host_membership_change(): 963 """Invoked when host membership changes.""" 964 everyone = AclGroup.objects.get(name='Everyone') 965 966 # find hosts that aren't in any ACL group and add them to Everyone 967 # TODO(showard): this is a bit of a hack, since the fact that this query 968 # works is kind of a coincidence of Django internals. This trick 969 # doesn't work in general (on all foreign key relationships). I'll 970 # replace it with a better technique when the need arises. 971 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 972 everyone.hosts.add(*orphaned_hosts.distinct()) 973 974 # find hosts in both Everyone and another ACL group, and remove them 975 # from Everyone 976 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 977 acled_hosts = set() 978 for host in hosts_in_everyone: 979 # Has an ACL group other than Everyone 980 if host.aclgroup_set.count() > 1: 981 acled_hosts.add(host) 982 everyone.hosts.remove(*acled_hosts) 983 984 985 def delete(self): 986 if (self.name == 'Everyone'): 987 raise AclAccessViolation("You cannot delete 'Everyone'!") 988 self.check_for_acl_violation_acl_group() 989 super(AclGroup, self).delete() 990 self.on_host_membership_change() 991 992 993 def add_current_user_if_empty(self): 994 """Adds the current user if the set of users is empty.""" 995 if not self.users.count(): 996 self.users.add(User.current_user()) 997 998 999 def perform_after_save(self, change): 1000 """Called after a save. 1001 1002 @param change: Whether there was a change. 1003 """ 1004 if not change: 1005 self.users.add(User.current_user()) 1006 self.add_current_user_if_empty() 1007 self.on_host_membership_change() 1008 1009 1010 def save(self, *args, **kwargs): 1011 change = bool(self.id) 1012 if change: 1013 # Check the original object for an ACL violation 1014 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 1015 super(AclGroup, self).save(*args, **kwargs) 1016 self.perform_after_save(change) 1017 1018 1019 class Meta: 1020 """Metadata for class AclGroup.""" 1021 db_table = 'afe_acl_groups' 1022 1023 def __unicode__(self): 1024 return unicode(self.name) 1025 1026 1027class Kernel(dbmodels.Model): 1028 """ 1029 A kernel configuration for a parameterized job 1030 """ 1031 version = dbmodels.CharField(max_length=255) 1032 cmdline = dbmodels.CharField(max_length=255, blank=True) 1033 1034 @classmethod 1035 def create_kernels(cls, kernel_list): 1036 """Creates all kernels in the kernel list. 1037 1038 @param cls: Implicit class object. 1039 @param kernel_list: A list of dictionaries that describe the kernels, 1040 in the same format as the 'kernel' argument to 1041 rpc_interface.generate_control_file. 1042 @return A list of the created kernels. 1043 """ 1044 if not kernel_list: 1045 return None 1046 return [cls._create(kernel) for kernel in kernel_list] 1047 1048 1049 @classmethod 1050 def _create(cls, kernel_dict): 1051 version = kernel_dict.pop('version') 1052 cmdline = kernel_dict.pop('cmdline', '') 1053 1054 if kernel_dict: 1055 raise Exception('Extraneous kernel arguments remain: %r' 1056 % kernel_dict) 1057 1058 kernel, _ = cls.objects.get_or_create(version=version, 1059 cmdline=cmdline) 1060 return kernel 1061 1062 1063 class Meta: 1064 """Metadata for class Kernel.""" 1065 db_table = 'afe_kernels' 1066 unique_together = ('version', 'cmdline') 1067 1068 def __unicode__(self): 1069 return u'%s %s' % (self.version, self.cmdline) 1070 1071 1072class ParameterizedJob(dbmodels.Model): 1073 """ 1074 Auxiliary configuration for a parameterized job. 1075 """ 1076 test = dbmodels.ForeignKey(Test) 1077 label = dbmodels.ForeignKey(Label, null=True) 1078 use_container = dbmodels.BooleanField(default=False) 1079 profile_only = dbmodels.BooleanField(default=False) 1080 upload_kernel_config = dbmodels.BooleanField(default=False) 1081 1082 kernels = dbmodels.ManyToManyField( 1083 Kernel, db_table='afe_parameterized_job_kernels') 1084 profilers = dbmodels.ManyToManyField( 1085 Profiler, through='ParameterizedJobProfiler') 1086 1087 1088 @classmethod 1089 def smart_get(cls, id_or_name, *args, **kwargs): 1090 """For compatibility with Job.add_object. 1091 1092 @param cls: Implicit class object. 1093 @param id_or_name: The ID or name to get. 1094 @param args: Non-keyword arguments. 1095 @param kwargs: Keyword arguments. 1096 """ 1097 return cls.objects.get(pk=id_or_name) 1098 1099 1100 def job(self): 1101 """Returns the job if it exists, or else None.""" 1102 jobs = self.job_set.all() 1103 assert jobs.count() <= 1 1104 return jobs and jobs[0] or None 1105 1106 1107 class Meta: 1108 """Metadata for class ParameterizedJob.""" 1109 db_table = 'afe_parameterized_jobs' 1110 1111 def __unicode__(self): 1112 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 1113 1114 1115class ParameterizedJobProfiler(dbmodels.Model): 1116 """ 1117 A profiler to run on a parameterized job 1118 """ 1119 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1120 profiler = dbmodels.ForeignKey(Profiler) 1121 1122 class Meta: 1123 """Metedata for class ParameterizedJobProfiler.""" 1124 db_table = 'afe_parameterized_jobs_profilers' 1125 unique_together = ('parameterized_job', 'profiler') 1126 1127 1128class ParameterizedJobProfilerParameter(dbmodels.Model): 1129 """ 1130 A parameter for a profiler in a parameterized job 1131 """ 1132 parameterized_job_profiler = dbmodels.ForeignKey(ParameterizedJobProfiler) 1133 parameter_name = dbmodels.CharField(max_length=255) 1134 parameter_value = dbmodels.TextField() 1135 parameter_type = dbmodels.CharField( 1136 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1137 1138 class Meta: 1139 """Metadata for class ParameterizedJobProfilerParameter.""" 1140 db_table = 'afe_parameterized_job_profiler_parameters' 1141 unique_together = ('parameterized_job_profiler', 'parameter_name') 1142 1143 def __unicode__(self): 1144 return u'%s - %s' % (self.parameterized_job_profiler.profiler.name, 1145 self.parameter_name) 1146 1147 1148class ParameterizedJobParameter(dbmodels.Model): 1149 """ 1150 Parameters for a parameterized job 1151 """ 1152 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1153 test_parameter = dbmodels.ForeignKey(TestParameter) 1154 parameter_value = dbmodels.TextField() 1155 parameter_type = dbmodels.CharField( 1156 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1157 1158 class Meta: 1159 """Metadata for class ParameterizedJobParameter.""" 1160 db_table = 'afe_parameterized_job_parameters' 1161 unique_together = ('parameterized_job', 'test_parameter') 1162 1163 def __unicode__(self): 1164 return u'%s - %s' % (self.parameterized_job.job().name, 1165 self.test_parameter.name) 1166 1167 1168class JobManager(model_logic.ExtendedManager): 1169 'Custom manager to provide efficient status counts querying.' 1170 def get_status_counts(self, job_ids): 1171 """Returns a dict mapping the given job IDs to their status count dicts. 1172 1173 @param job_ids: A list of job IDs. 1174 """ 1175 if not job_ids: 1176 return {} 1177 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1178 cursor = connection.cursor() 1179 cursor.execute(""" 1180 SELECT job_id, status, aborted, complete, COUNT(*) 1181 FROM afe_host_queue_entries 1182 WHERE job_id IN %s 1183 GROUP BY job_id, status, aborted, complete 1184 """ % id_list) 1185 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1186 for job_id, status, aborted, complete, count in cursor.fetchall(): 1187 job_dict = all_job_counts[job_id] 1188 full_status = HostQueueEntry.compute_full_status(status, aborted, 1189 complete) 1190 job_dict.setdefault(full_status, 0) 1191 job_dict[full_status] += count 1192 return all_job_counts 1193 1194 1195class Job(dbmodels.Model, model_logic.ModelExtensions): 1196 """\ 1197 owner: username of job owner 1198 name: job name (does not have to be unique) 1199 priority: Integer priority value. Higher is more important. 1200 control_file: contents of control file 1201 control_type: Client or Server 1202 created_on: date of job creation 1203 submitted_on: date of job submission 1204 synch_count: how many hosts should be used per autoserv execution 1205 run_verify: Whether or not to run the verify phase 1206 run_reset: Whether or not to run the reset phase 1207 timeout: DEPRECATED - hours from queuing time until job times out 1208 timeout_mins: minutes from job queuing time until the job times out 1209 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1210 times out 1211 max_runtime_mins: minutes from job starting time until job times out 1212 email_list: list of people to email on completion delimited by any of: 1213 white space, ',', ':', ';' 1214 dependency_labels: many-to-many relationship with labels corresponding to 1215 job dependencies 1216 reboot_before: Never, If dirty, or Always 1217 reboot_after: Never, If all tests passed, or Always 1218 parse_failed_repair: if True, a failed repair launched by this job will have 1219 its results parsed as part of the job. 1220 drone_set: The set of drones to run this job on 1221 parent_job: Parent job (optional) 1222 test_retry: Number of times to retry test if the test did not complete 1223 successfully. (optional, default: 0) 1224 require_ssp: Require server-side packaging unless require_ssp is set to 1225 False. (optional, default: None) 1226 """ 1227 1228 # TODO: Investigate, if jobkeyval_set is really needed. 1229 # dynamic_suite will write them into an attached file for the drone, but 1230 # it doesn't seem like they are actually used. If they aren't used, remove 1231 # jobkeyval_set here. 1232 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1233 'hostqueueentry_set', 1234 'jobkeyval_set', 1235 'shard']) 1236 1237 1238 def _deserialize_relation(self, link, data): 1239 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1240 for obj in data: 1241 obj['job_id'] = self.id 1242 1243 super(Job, self)._deserialize_relation(link, data) 1244 1245 1246 def custom_deserialize_relation(self, link, data): 1247 assert link == 'shard', 'Link %s should not be deserialized' % link 1248 self.shard = Shard.deserialize(data) 1249 1250 1251 def sanity_check_update_from_shard(self, shard, updated_serialized): 1252 # If the job got aborted on the master after the client fetched it 1253 # no shard_id will be set. The shard might still push updates though, 1254 # as the job might complete before the abort bit syncs to the shard. 1255 # Alternative considered: The master scheduler could be changed to not 1256 # set aborted jobs to completed that are sharded out. But that would 1257 # require database queries and seemed more complicated to implement. 1258 # This seems safe to do, as there won't be updates pushed from the wrong 1259 # shards should be powered off and wiped hen they are removed from the 1260 # master. 1261 if self.shard_id and self.shard_id != shard.id: 1262 raise error.UnallowedRecordsSentToMaster( 1263 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1264 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1265 shard.id)) 1266 1267 1268 # TIMEOUT is deprecated. 1269 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1270 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1271 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1272 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1273 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1274 # completed. 1275 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1276 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1277 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1278 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1279 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1280 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1281 default=False) 1282 1283 owner = dbmodels.CharField(max_length=255) 1284 name = dbmodels.CharField(max_length=255) 1285 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1286 control_file = dbmodels.TextField(null=True, blank=True) 1287 control_type = dbmodels.SmallIntegerField( 1288 choices=control_data.CONTROL_TYPE.choices(), 1289 blank=True, # to allow 0 1290 default=control_data.CONTROL_TYPE.CLIENT) 1291 created_on = dbmodels.DateTimeField() 1292 synch_count = dbmodels.IntegerField(blank=True, default=0) 1293 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1294 run_verify = dbmodels.BooleanField(default=False) 1295 email_list = dbmodels.CharField(max_length=250, blank=True) 1296 dependency_labels = ( 1297 dbmodels.ManyToManyField(Label, blank=True, 1298 db_table='afe_jobs_dependency_labels')) 1299 reboot_before = dbmodels.SmallIntegerField( 1300 choices=model_attributes.RebootBefore.choices(), blank=True, 1301 default=DEFAULT_REBOOT_BEFORE) 1302 reboot_after = dbmodels.SmallIntegerField( 1303 choices=model_attributes.RebootAfter.choices(), blank=True, 1304 default=DEFAULT_REBOOT_AFTER) 1305 parse_failed_repair = dbmodels.BooleanField( 1306 default=DEFAULT_PARSE_FAILED_REPAIR) 1307 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1308 # completed. 1309 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1310 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1311 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1312 1313 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1314 blank=True) 1315 1316 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1317 1318 test_retry = dbmodels.IntegerField(blank=True, default=0) 1319 1320 run_reset = dbmodels.BooleanField(default=True) 1321 1322 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1323 1324 # If this is None on the master, a slave should be found. 1325 # If this is None on a slave, it should be synced back to the master 1326 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1327 1328 # If this is None, server-side packaging will be used for server side test, 1329 # unless it's disabled in global config AUTOSERV/enable_ssp_container. 1330 require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True) 1331 1332 # custom manager 1333 objects = JobManager() 1334 1335 1336 @decorators.cached_property 1337 def labels(self): 1338 """All the labels of this job""" 1339 # We need to convert dependency_labels to a list, because all() gives us 1340 # back an iterator, and storing/caching an iterator means we'd only be 1341 # able to read from it once. 1342 return list(self.dependency_labels.all()) 1343 1344 1345 def is_server_job(self): 1346 """Returns whether this job is of type server.""" 1347 return self.control_type == control_data.CONTROL_TYPE.SERVER 1348 1349 1350 @classmethod 1351 def parameterized_jobs_enabled(cls): 1352 """Returns whether parameterized jobs are enabled. 1353 1354 @param cls: Implicit class object. 1355 """ 1356 return global_config.global_config.get_config_value( 1357 'AUTOTEST_WEB', 'parameterized_jobs', type=bool) 1358 1359 1360 @classmethod 1361 def check_parameterized_job(cls, control_file, parameterized_job): 1362 """Checks that the job is valid given the global config settings. 1363 1364 First, either control_file must be set, or parameterized_job must be 1365 set, but not both. Second, parameterized_job must be set if and only if 1366 the parameterized_jobs option in the global config is set to True. 1367 1368 @param cls: Implict class object. 1369 @param control_file: A control file. 1370 @param parameterized_job: A parameterized job. 1371 """ 1372 if not (bool(control_file) ^ bool(parameterized_job)): 1373 raise Exception('Job must have either control file or ' 1374 'parameterization, but not both') 1375 1376 parameterized_jobs_enabled = cls.parameterized_jobs_enabled() 1377 if control_file and parameterized_jobs_enabled: 1378 raise Exception('Control file specified, but parameterized jobs ' 1379 'are enabled') 1380 if parameterized_job and not parameterized_jobs_enabled: 1381 raise Exception('Parameterized job specified, but parameterized ' 1382 'jobs are not enabled') 1383 1384 1385 @classmethod 1386 def create(cls, owner, options, hosts): 1387 """Creates a job. 1388 1389 The job is created by taking some information (the listed args) and 1390 filling in the rest of the necessary information. 1391 1392 @param cls: Implicit class object. 1393 @param owner: The owner for the job. 1394 @param options: An options object. 1395 @param hosts: The hosts to use. 1396 """ 1397 AclGroup.check_for_acl_violation_hosts(hosts) 1398 1399 control_file = options.get('control_file') 1400 parameterized_job = options.get('parameterized_job') 1401 1402 # The current implementation of parameterized jobs requires that only 1403 # control files or parameterized jobs are used. Using the image 1404 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1405 # parameterized jobs and control files jobs, it does muck enough with 1406 # normal jobs by adding a parameterized id to them that this check will 1407 # fail. So for now we just skip this check. 1408 # cls.check_parameterized_job(control_file=control_file, 1409 # parameterized_job=parameterized_job) 1410 user = User.current_user() 1411 if options.get('reboot_before') is None: 1412 options['reboot_before'] = user.get_reboot_before_display() 1413 if options.get('reboot_after') is None: 1414 options['reboot_after'] = user.get_reboot_after_display() 1415 1416 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1417 1418 if options.get('timeout_mins') is None and options.get('timeout'): 1419 options['timeout_mins'] = options['timeout'] * 60 1420 1421 job = cls.add_object( 1422 owner=owner, 1423 name=options['name'], 1424 priority=options['priority'], 1425 control_file=control_file, 1426 control_type=options['control_type'], 1427 synch_count=options.get('synch_count'), 1428 # timeout needs to be deleted in the future. 1429 timeout=options.get('timeout'), 1430 timeout_mins=options.get('timeout_mins'), 1431 max_runtime_mins=options.get('max_runtime_mins'), 1432 run_verify=options.get('run_verify'), 1433 email_list=options.get('email_list'), 1434 reboot_before=options.get('reboot_before'), 1435 reboot_after=options.get('reboot_after'), 1436 parse_failed_repair=options.get('parse_failed_repair'), 1437 created_on=datetime.now(), 1438 drone_set=drone_set, 1439 parameterized_job=parameterized_job, 1440 parent_job=options.get('parent_job_id'), 1441 test_retry=options.get('test_retry'), 1442 run_reset=options.get('run_reset'), 1443 require_ssp=options.get('require_ssp')) 1444 1445 job.dependency_labels = options['dependencies'] 1446 1447 if options.get('keyvals'): 1448 for key, value in options['keyvals'].iteritems(): 1449 JobKeyval.objects.create(job=job, key=key, value=value) 1450 1451 return job 1452 1453 1454 @classmethod 1455 def _add_filters_for_shard_assignment(cls, query, known_ids): 1456 """Exclude jobs that should be not sent to shard. 1457 1458 This is a helper that filters out the following jobs: 1459 - Non-aborted jobs known to shard as specified in |known_ids|. 1460 Note for jobs aborted on master, even if already known to shard, 1461 will be sent to shard again so that shard can abort them. 1462 - Completed jobs 1463 - Active jobs 1464 @param query: A query that finds jobs for shards, to which the 'exclude' 1465 filters will be applied. 1466 @param known_ids: List of all ids of incomplete jobs, the shard already 1467 knows about. 1468 1469 @returns: A django QuerySet after filtering out unnecessary jobs. 1470 1471 """ 1472 return query.exclude( 1473 id__in=known_ids, 1474 hostqueueentry__aborted=False 1475 ).exclude( 1476 hostqueueentry__complete=True 1477 ).exclude( 1478 hostqueueentry__active=True) 1479 1480 1481 @classmethod 1482 def assign_to_shard(cls, shard, known_ids): 1483 """Assigns unassigned jobs to a shard. 1484 1485 For all labels that have been assigned to this shard, all jobs that 1486 have this label, are assigned to this shard. 1487 1488 Jobs that are assigned to the shard but aren't already present on the 1489 shard are returned. 1490 1491 @param shard: The shard to assign jobs to. 1492 @param known_ids: List of all ids of incomplete jobs, the shard already 1493 knows about. 1494 This is used to figure out which jobs should be sent 1495 to the shard. If shard_ids were used instead, jobs 1496 would only be transferred once, even if the client 1497 failed persisting them. 1498 The number of unfinished jobs usually lies in O(1000). 1499 Assuming one id takes 8 chars in the json, this means 1500 overhead that lies in the lower kilobyte range. 1501 A not in query with 5000 id's takes about 30ms. 1502 1503 @returns The job objects that should be sent to the shard. 1504 """ 1505 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1506 # If this changes or they are triggered manually, this applies: 1507 # Jobs may be returned more than once by concurrent calls of this 1508 # function, as there is a race condition between SELECT and UPDATE. 1509 query = Job.objects.filter( 1510 dependency_labels=shard.labels.all(), 1511 # If an HQE associated with a job is removed in some reasons, 1512 # such jobs should be excluded. Refer crbug.com/479766 1513 hostqueueentry__isnull=False 1514 ) 1515 query = cls._add_filters_for_shard_assignment(query, known_ids) 1516 job_ids = set(query.distinct().values_list('pk', flat=True)) 1517 1518 # Combine frontend jobs in the heartbeat. 1519 query = Job.objects.filter( 1520 hostqueueentry__meta_host__isnull=True, 1521 hostqueueentry__host__isnull=False, 1522 hostqueueentry__host__labels=shard.labels.all() 1523 ) 1524 query = cls._add_filters_for_shard_assignment(query, known_ids) 1525 job_ids |= set(query.distinct().values_list('pk', flat=True)) 1526 if job_ids: 1527 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1528 return list(Job.objects.filter(pk__in=job_ids).all()) 1529 return [] 1530 1531 1532 def save(self, *args, **kwargs): 1533 # The current implementation of parameterized jobs requires that only 1534 # control files or parameterized jobs are used. Using the image 1535 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1536 # parameterized jobs and control files jobs, it does muck enough with 1537 # normal jobs by adding a parameterized id to them that this check will 1538 # fail. So for now we just skip this check. 1539 # cls.check_parameterized_job(control_file=self.control_file, 1540 # parameterized_job=self.parameterized_job) 1541 super(Job, self).save(*args, **kwargs) 1542 1543 1544 def queue(self, hosts, atomic_group=None, is_template=False): 1545 """Enqueue a job on the given hosts. 1546 1547 @param hosts: The hosts to use. 1548 @param atomic_group: The associated atomic group. 1549 @param is_template: Whether the status should be "Template". 1550 """ 1551 if not hosts: 1552 if atomic_group: 1553 # No hosts or labels are required to queue an atomic group 1554 # Job. However, if they are given, we respect them below. 1555 atomic_group.enqueue_job(self, is_template=is_template) 1556 else: 1557 # hostless job 1558 entry = HostQueueEntry.create(job=self, is_template=is_template) 1559 entry.save() 1560 return 1561 1562 for host in hosts: 1563 host.enqueue_job(self, atomic_group=atomic_group, 1564 is_template=is_template) 1565 1566 1567 def create_recurring_job(self, start_date, loop_period, loop_count, owner): 1568 """Creates a recurring job. 1569 1570 @param start_date: The starting date of the job. 1571 @param loop_period: How often to re-run the job, in seconds. 1572 @param loop_count: The re-run count. 1573 @param owner: The owner of the job. 1574 """ 1575 rec = RecurringRun(job=self, start_date=start_date, 1576 loop_period=loop_period, 1577 loop_count=loop_count, 1578 owner=User.objects.get(login=owner)) 1579 rec.save() 1580 return rec.id 1581 1582 1583 def user(self): 1584 """Gets the user of this job, or None if it doesn't exist.""" 1585 try: 1586 return User.objects.get(login=self.owner) 1587 except self.DoesNotExist: 1588 return None 1589 1590 1591 def abort(self): 1592 """Aborts this job.""" 1593 for queue_entry in self.hostqueueentry_set.all(): 1594 queue_entry.abort() 1595 1596 1597 def tag(self): 1598 """Returns a string tag for this job.""" 1599 return '%s-%s' % (self.id, self.owner) 1600 1601 1602 def keyval_dict(self): 1603 """Returns all keyvals for this job as a dictionary.""" 1604 return dict((keyval.key, keyval.value) 1605 for keyval in self.jobkeyval_set.all()) 1606 1607 1608 @classmethod 1609 def get_attribute_model(cls): 1610 """Return the attribute model. 1611 1612 Override method in parent class. This class is called when 1613 deserializing the one-to-many relationship betwen Job and JobKeyval. 1614 On deserialization, we will try to clear any existing job keyvals 1615 associated with a job to avoid any inconsistency. 1616 Though Job doesn't implement ModelWithAttribute, we still treat 1617 it as an attribute model for this purpose. 1618 1619 @returns: The attribute model of Job. 1620 """ 1621 return JobKeyval 1622 1623 1624 class Meta: 1625 """Metadata for class Job.""" 1626 db_table = 'afe_jobs' 1627 1628 def __unicode__(self): 1629 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1630 1631 1632class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1633 """Keyvals associated with jobs""" 1634 1635 SERIALIZATION_LINKS_TO_KEEP = set(['job']) 1636 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value']) 1637 1638 job = dbmodels.ForeignKey(Job) 1639 key = dbmodels.CharField(max_length=90) 1640 value = dbmodels.CharField(max_length=300) 1641 1642 objects = model_logic.ExtendedManager() 1643 1644 1645 @classmethod 1646 def get_record(cls, data): 1647 """Check the database for an identical record. 1648 1649 Use job_id and key to search for a existing record. 1650 1651 @raises: DoesNotExist, if no record found 1652 @raises: MultipleObjectsReturned if multiple records found. 1653 """ 1654 # TODO(fdeng): We should use job_id and key together as 1655 # a primary key in the db. 1656 return cls.objects.get(job_id=data['job_id'], key=data['key']) 1657 1658 1659 @classmethod 1660 def deserialize(cls, data): 1661 """Override deserialize in parent class. 1662 1663 Do not deserialize id as id is not kept consistent on master and shards. 1664 1665 @param data: A dictionary of data to deserialize. 1666 1667 @returns: A JobKeyval object. 1668 """ 1669 if data: 1670 data.pop('id') 1671 return super(JobKeyval, cls).deserialize(data) 1672 1673 1674 class Meta: 1675 """Metadata for class JobKeyval.""" 1676 db_table = 'afe_job_keyvals' 1677 1678 1679class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1680 """Represents an ineligible host queue.""" 1681 job = dbmodels.ForeignKey(Job) 1682 host = dbmodels.ForeignKey(Host) 1683 1684 objects = model_logic.ExtendedManager() 1685 1686 class Meta: 1687 """Metadata for class IneligibleHostQueue.""" 1688 db_table = 'afe_ineligible_host_queues' 1689 1690 1691class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1692 """Represents a host queue entry.""" 1693 1694 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1695 SERIALIZATION_LINKS_TO_KEEP = set(['host']) 1696 SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted']) 1697 1698 1699 def custom_deserialize_relation(self, link, data): 1700 assert link == 'meta_host' 1701 self.meta_host = Label.deserialize(data) 1702 1703 1704 def sanity_check_update_from_shard(self, shard, updated_serialized, 1705 job_ids_sent): 1706 if self.job_id not in job_ids_sent: 1707 raise error.UnallowedRecordsSentToMaster( 1708 'Sent HostQueueEntry without corresponding ' 1709 'job entry: %s' % updated_serialized) 1710 1711 1712 Status = host_queue_entry_states.Status 1713 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1714 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1715 1716 job = dbmodels.ForeignKey(Job) 1717 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1718 status = dbmodels.CharField(max_length=255) 1719 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1720 db_column='meta_host') 1721 active = dbmodels.BooleanField(default=False) 1722 complete = dbmodels.BooleanField(default=False) 1723 deleted = dbmodels.BooleanField(default=False) 1724 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1725 default='') 1726 # If atomic_group is set, this is a virtual HostQueueEntry that will 1727 # be expanded into many actual hosts within the group at schedule time. 1728 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1729 aborted = dbmodels.BooleanField(default=False) 1730 started_on = dbmodels.DateTimeField(null=True, blank=True) 1731 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1732 1733 objects = model_logic.ExtendedManager() 1734 1735 1736 def __init__(self, *args, **kwargs): 1737 super(HostQueueEntry, self).__init__(*args, **kwargs) 1738 self._record_attributes(['status']) 1739 1740 1741 @classmethod 1742 def create(cls, job, host=None, meta_host=None, atomic_group=None, 1743 is_template=False): 1744 """Creates a new host queue entry. 1745 1746 @param cls: Implicit class object. 1747 @param job: The associated job. 1748 @param host: The associated host. 1749 @param meta_host: The associated meta host. 1750 @param atomic_group: The associated atomic group. 1751 @param is_template: Whether the status should be "Template". 1752 """ 1753 if is_template: 1754 status = cls.Status.TEMPLATE 1755 else: 1756 status = cls.Status.QUEUED 1757 1758 return cls(job=job, host=host, meta_host=meta_host, 1759 atomic_group=atomic_group, status=status) 1760 1761 1762 def save(self, *args, **kwargs): 1763 self._set_active_and_complete() 1764 super(HostQueueEntry, self).save(*args, **kwargs) 1765 self._check_for_updated_attributes() 1766 1767 1768 def execution_path(self): 1769 """ 1770 Path to this entry's results (relative to the base results directory). 1771 """ 1772 return os.path.join(self.job.tag(), self.execution_subdir) 1773 1774 1775 def host_or_metahost_name(self): 1776 """Returns the first non-None name found in priority order. 1777 1778 The priority order checked is: (1) host name; (2) meta host name; and 1779 (3) atomic group name. 1780 """ 1781 if self.host: 1782 return self.host.hostname 1783 elif self.meta_host: 1784 return self.meta_host.name 1785 else: 1786 assert self.atomic_group, "no host, meta_host or atomic group!" 1787 return self.atomic_group.name 1788 1789 1790 def _set_active_and_complete(self): 1791 if self.status in self.ACTIVE_STATUSES: 1792 self.active, self.complete = True, False 1793 elif self.status in self.COMPLETE_STATUSES: 1794 self.active, self.complete = False, True 1795 else: 1796 self.active, self.complete = False, False 1797 1798 1799 def on_attribute_changed(self, attribute, old_value): 1800 assert attribute == 'status' 1801 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1802 self.status) 1803 1804 1805 def is_meta_host_entry(self): 1806 'True if this is a entry has a meta_host instead of a host.' 1807 return self.host is None and self.meta_host is not None 1808 1809 1810 # This code is shared between rpc_interface and models.HostQueueEntry. 1811 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1812 # a class method was the best way to refactor it. Attempting to put it in 1813 # rpc_utils or a new utils module failed as that would require us to import 1814 # models.py but to call it from here we would have to import the utils.py 1815 # thus creating a cycle. 1816 @classmethod 1817 def abort_host_queue_entries(cls, host_queue_entries): 1818 """Aborts a collection of host_queue_entries. 1819 1820 Abort these host queue entry and all host queue entries of jobs created 1821 by them. 1822 1823 @param host_queue_entries: List of host queue entries we want to abort. 1824 """ 1825 # This isn't completely immune to race conditions since it's not atomic, 1826 # but it should be safe given the scheduler's behavior. 1827 1828 # TODO(milleral): crbug.com/230100 1829 # The |abort_host_queue_entries| rpc does nearly exactly this, 1830 # however, trying to re-use the code generates some horrible 1831 # circular import error. I'd be nice to refactor things around 1832 # sometime so the code could be reused. 1833 1834 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1835 # minimize the total number of database queries: 1836 children = set() 1837 new_children = set(host_queue_entries) 1838 while new_children: 1839 children.update(new_children) 1840 new_child_ids = [hqe.job_id for hqe in new_children] 1841 new_children = HostQueueEntry.objects.filter( 1842 job__parent_job__in=new_child_ids, 1843 complete=False, aborted=False).all() 1844 # To handle circular parental relationships 1845 new_children = set(new_children) - children 1846 1847 # Associate a user with the host queue entries that we're about 1848 # to abort so that we can look up who to blame for the aborts. 1849 now = datetime.now() 1850 user = User.current_user() 1851 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1852 aborted_by=user, aborted_on=now) for hqe in children] 1853 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1854 # Bulk update all of the HQEs to set the abort bit. 1855 child_ids = [hqe.id for hqe in children] 1856 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1857 1858 1859 def abort(self): 1860 """ Aborts this host queue entry. 1861 1862 Abort this host queue entry and all host queue entries of jobs created by 1863 this one. 1864 1865 """ 1866 if not self.complete and not self.aborted: 1867 HostQueueEntry.abort_host_queue_entries([self]) 1868 1869 1870 @classmethod 1871 def compute_full_status(cls, status, aborted, complete): 1872 """Returns a modified status msg if the host queue entry was aborted. 1873 1874 @param cls: Implicit class object. 1875 @param status: The original status message. 1876 @param aborted: Whether the host queue entry was aborted. 1877 @param complete: Whether the host queue entry was completed. 1878 """ 1879 if aborted and not complete: 1880 return 'Aborted (%s)' % status 1881 return status 1882 1883 1884 def full_status(self): 1885 """Returns the full status of this host queue entry, as a string.""" 1886 return self.compute_full_status(self.status, self.aborted, 1887 self.complete) 1888 1889 1890 def _postprocess_object_dict(self, object_dict): 1891 object_dict['full_status'] = self.full_status() 1892 1893 1894 class Meta: 1895 """Metadata for class HostQueueEntry.""" 1896 db_table = 'afe_host_queue_entries' 1897 1898 1899 1900 def __unicode__(self): 1901 hostname = None 1902 if self.host: 1903 hostname = self.host.hostname 1904 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1905 1906 1907class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1908 """Represents an aborted host queue entry.""" 1909 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1910 aborted_by = dbmodels.ForeignKey(User) 1911 aborted_on = dbmodels.DateTimeField() 1912 1913 objects = model_logic.ExtendedManager() 1914 1915 1916 def save(self, *args, **kwargs): 1917 self.aborted_on = datetime.now() 1918 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1919 1920 class Meta: 1921 """Metadata for class AbortedHostQueueEntry.""" 1922 db_table = 'afe_aborted_host_queue_entries' 1923 1924 1925class RecurringRun(dbmodels.Model, model_logic.ModelExtensions): 1926 """\ 1927 job: job to use as a template 1928 owner: owner of the instantiated template 1929 start_date: Run the job at scheduled date 1930 loop_period: Re-run (loop) the job periodically 1931 (in every loop_period seconds) 1932 loop_count: Re-run (loop) count 1933 """ 1934 1935 job = dbmodels.ForeignKey(Job) 1936 owner = dbmodels.ForeignKey(User) 1937 start_date = dbmodels.DateTimeField() 1938 loop_period = dbmodels.IntegerField(blank=True) 1939 loop_count = dbmodels.IntegerField(blank=True) 1940 1941 objects = model_logic.ExtendedManager() 1942 1943 class Meta: 1944 """Metadata for class RecurringRun.""" 1945 db_table = 'afe_recurring_run' 1946 1947 def __unicode__(self): 1948 return u'RecurringRun(job %s, start %s, period %s, count %s)' % ( 1949 self.job.id, self.start_date, self.loop_period, self.loop_count) 1950 1951 1952class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 1953 """\ 1954 Tasks to run on hosts at the next time they are in the Ready state. Use this 1955 for high-priority tasks, such as forced repair or forced reinstall. 1956 1957 host: host to run this task on 1958 task: special task to run 1959 time_requested: date and time the request for this task was made 1960 is_active: task is currently running 1961 is_complete: task has finished running 1962 is_aborted: task was aborted 1963 time_started: date and time the task started 1964 time_finished: date and time the task finished 1965 queue_entry: Host queue entry waiting on this task (or None, if task was not 1966 started in preparation of a job) 1967 """ 1968 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 1969 string_values=True) 1970 1971 host = dbmodels.ForeignKey(Host, blank=False, null=False) 1972 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 1973 blank=False, null=False) 1974 requested_by = dbmodels.ForeignKey(User) 1975 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1976 null=False) 1977 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1978 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1979 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 1980 time_started = dbmodels.DateTimeField(null=True, blank=True) 1981 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1982 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1983 time_finished = dbmodels.DateTimeField(null=True, blank=True) 1984 1985 objects = model_logic.ExtendedManager() 1986 1987 1988 def save(self, **kwargs): 1989 if self.queue_entry: 1990 self.requested_by = User.objects.get( 1991 login=self.queue_entry.job.owner) 1992 super(SpecialTask, self).save(**kwargs) 1993 1994 1995 def execution_path(self): 1996 """Get the execution path of the SpecialTask. 1997 1998 This method returns different paths depending on where a 1999 the task ran: 2000 * Master: hosts/hostname/task_id-task_type 2001 * Shard: Master_path/time_created 2002 This is to work around the fact that a shard can fail independent 2003 of the master, and be replaced by another shard that has the same 2004 hosts. Without the time_created stamp the logs of the tasks running 2005 on the second shard will clobber the logs from the first in google 2006 storage, because task ids are not globally unique. 2007 2008 @return: An execution path for the task. 2009 """ 2010 results_path = 'hosts/%s/%s-%s' % (self.host.hostname, self.id, 2011 self.task.lower()) 2012 2013 # If we do this on the master it will break backward compatibility, 2014 # as there are tasks that currently don't have timestamps. If a host 2015 # or job has been sent to a shard, the rpc for that host/job will 2016 # be redirected to the shard, so this global_config check will happen 2017 # on the shard the logs are on. 2018 is_shard = global_config.global_config.get_config_value( 2019 'SHARD', 'shard_hostname', type=str, default='') 2020 if not is_shard: 2021 return results_path 2022 2023 # Generate a uid to disambiguate special task result directories 2024 # in case this shard fails. The simplest uid is the job_id, however 2025 # in rare cases tasks do not have jobs associated with them (eg: 2026 # frontend verify), so just use the creation timestamp. The clocks 2027 # between a shard and master should always be in sync. Any discrepancies 2028 # will be brought to our attention in the form of job timeouts. 2029 uid = self.time_requested.strftime('%Y%d%m%H%M%S') 2030 2031 # TODO: This is a hack, however it is the easiest way to achieve 2032 # correctness. There is currently some debate over the future of 2033 # tasks in our infrastructure and refactoring everything right 2034 # now isn't worth the time. 2035 return '%s/%s' % (results_path, uid) 2036 2037 2038 # property to emulate HostQueueEntry.status 2039 @property 2040 def status(self): 2041 """ 2042 Return a host queue entry status appropriate for this task. Although 2043 SpecialTasks are not HostQueueEntries, it is helpful to the user to 2044 present similar statuses. 2045 """ 2046 if self.is_complete: 2047 if self.success: 2048 return HostQueueEntry.Status.COMPLETED 2049 return HostQueueEntry.Status.FAILED 2050 if self.is_active: 2051 return HostQueueEntry.Status.RUNNING 2052 return HostQueueEntry.Status.QUEUED 2053 2054 2055 # property to emulate HostQueueEntry.started_on 2056 @property 2057 def started_on(self): 2058 """Returns the time at which this special task started.""" 2059 return self.time_started 2060 2061 2062 @classmethod 2063 def schedule_special_task(cls, host, task): 2064 """Schedules a special task on a host if not already scheduled. 2065 2066 @param cls: Implicit class object. 2067 @param host: The host to use. 2068 @param task: The task to schedule. 2069 """ 2070 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 2071 is_active=False, 2072 is_complete=False) 2073 if existing_tasks: 2074 return existing_tasks[0] 2075 2076 special_task = SpecialTask(host=host, task=task, 2077 requested_by=User.current_user()) 2078 special_task.save() 2079 return special_task 2080 2081 2082 def abort(self): 2083 """ Abort this special task.""" 2084 self.is_aborted = True 2085 self.save() 2086 2087 2088 def activate(self): 2089 """ 2090 Sets a task as active and sets the time started to the current time. 2091 """ 2092 logging.info('Starting: %s', self) 2093 self.is_active = True 2094 self.time_started = datetime.now() 2095 self.save() 2096 2097 2098 def finish(self, success): 2099 """Sets a task as completed. 2100 2101 @param success: Whether or not the task was successful. 2102 """ 2103 logging.info('Finished: %s', self) 2104 self.is_active = False 2105 self.is_complete = True 2106 self.success = success 2107 if self.time_started: 2108 self.time_finished = datetime.now() 2109 self.save() 2110 2111 2112 class Meta: 2113 """Metadata for class SpecialTask.""" 2114 db_table = 'afe_special_tasks' 2115 2116 2117 def __unicode__(self): 2118 result = u'Special Task %s (host %s, task %s, time %s)' % ( 2119 self.id, self.host, self.task, self.time_requested) 2120 if self.is_complete: 2121 result += u' (completed)' 2122 elif self.is_active: 2123 result += u' (active)' 2124 2125 return result 2126 2127 2128class StableVersion(dbmodels.Model, model_logic.ModelExtensions): 2129 2130 board = dbmodels.CharField(max_length=255, unique=True) 2131 version = dbmodels.CharField(max_length=255) 2132 2133 class Meta: 2134 """Metadata for class StableVersion.""" 2135 db_table = 'afe_stable_versions' 2136