models.py revision 8421d5905ab0aed8689c2eea6be8d9c4042ce618
1# pylint: disable-msg=C0111 2 3import logging, os 4from datetime import datetime 5import django.core 6try: 7 from django.db import models as dbmodels, connection 8except django.core.exceptions.ImproperlyConfigured: 9 raise ImportError('Django database not yet configured. Import either ' 10 'setup_django_environment or ' 11 'setup_django_lite_environment from ' 12 'autotest_lib.frontend before any imports that ' 13 'depend on django models.') 14from xml.sax import saxutils 15import common 16from autotest_lib.frontend.afe import model_logic, model_attributes 17from autotest_lib.frontend.afe import rdb_model_extensions 18from autotest_lib.frontend import settings, thread_local 19from autotest_lib.client.common_lib import enum, error, host_protections 20from autotest_lib.client.common_lib import global_config 21from autotest_lib.client.common_lib import host_queue_entry_states 22from autotest_lib.client.common_lib import control_data, priorities, decorators 23 24# job options and user preferences 25DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 26DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER 27 28 29class AclAccessViolation(Exception): 30 """\ 31 Raised when an operation is attempted with proper permissions as 32 dictated by ACLs. 33 """ 34 35 36class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 37 """\ 38 An atomic group defines a collection of hosts which must only be scheduled 39 all at once. Any host with a label having an atomic group will only be 40 scheduled for a job at the same time as other hosts sharing that label. 41 42 Required: 43 name: A name for this atomic group, e.g. 'rack23' or 'funky_net'. 44 max_number_of_machines: The maximum number of machines that will be 45 scheduled at once when scheduling jobs to this atomic group. 46 The job.synch_count is considered the minimum. 47 48 Optional: 49 description: Arbitrary text description of this group's purpose. 50 """ 51 name = dbmodels.CharField(max_length=255, unique=True) 52 description = dbmodels.TextField(blank=True) 53 # This magic value is the default to simplify the scheduler logic. 54 # It must be "large". The common use of atomic groups is to want all 55 # machines in the group to be used, limits on which subset used are 56 # often chosen via dependency labels. 57 # TODO(dennisjeffrey): Revisit this so we don't have to assume that 58 # "infinity" is around 3.3 million. 59 INFINITE_MACHINES = 333333333 60 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 61 invalid = dbmodels.BooleanField(default=False, 62 editable=settings.FULL_ADMIN) 63 64 name_field = 'name' 65 objects = model_logic.ModelWithInvalidManager() 66 valid_objects = model_logic.ValidObjectsManager() 67 68 69 def enqueue_job(self, job, is_template=False): 70 """Enqueue a job on an associated atomic group of hosts. 71 72 @param job: A job to enqueue. 73 @param is_template: Whether the status should be "Template". 74 """ 75 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 76 is_template=is_template) 77 queue_entry.save() 78 79 80 def clean_object(self): 81 self.label_set.clear() 82 83 84 class Meta: 85 """Metadata for class AtomicGroup.""" 86 db_table = 'afe_atomic_groups' 87 88 89 def __unicode__(self): 90 return unicode(self.name) 91 92 93class Label(model_logic.ModelWithInvalid, dbmodels.Model): 94 """\ 95 Required: 96 name: label name 97 98 Optional: 99 kernel_config: URL/path to kernel config for jobs run on this label. 100 platform: If True, this is a platform label (defaults to False). 101 only_if_needed: If True, a Host with this label can only be used if that 102 label is requested by the job/test (either as the meta_host or 103 in the job_dependencies). 104 atomic_group: The atomic group associated with this label. 105 """ 106 name = dbmodels.CharField(max_length=255, unique=True) 107 kernel_config = dbmodels.CharField(max_length=255, blank=True) 108 platform = dbmodels.BooleanField(default=False) 109 invalid = dbmodels.BooleanField(default=False, 110 editable=settings.FULL_ADMIN) 111 only_if_needed = dbmodels.BooleanField(default=False) 112 113 name_field = 'name' 114 objects = model_logic.ModelWithInvalidManager() 115 valid_objects = model_logic.ValidObjectsManager() 116 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 117 118 119 def clean_object(self): 120 self.host_set.clear() 121 self.test_set.clear() 122 123 124 def enqueue_job(self, job, atomic_group=None, is_template=False): 125 """Enqueue a job on any host of this label. 126 127 @param job: A job to enqueue. 128 @param atomic_group: The associated atomic group. 129 @param is_template: Whether the status should be "Template". 130 """ 131 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 132 is_template=is_template, 133 atomic_group=atomic_group) 134 queue_entry.save() 135 136 137 class Meta: 138 """Metadata for class Label.""" 139 db_table = 'afe_labels' 140 141 def __unicode__(self): 142 return unicode(self.name) 143 144 145class Shard(dbmodels.Model, model_logic.ModelExtensions): 146 147 hostname = dbmodels.CharField(max_length=255, unique=True) 148 149 name_field = 'hostname' 150 151 labels = dbmodels.ManyToManyField(Label, blank=True, 152 db_table='afe_shards_labels') 153 154 class Meta: 155 """Metadata for class ParameterizedJob.""" 156 db_table = 'afe_shards' 157 158 159class Drone(dbmodels.Model, model_logic.ModelExtensions): 160 """ 161 A scheduler drone 162 163 hostname: the drone's hostname 164 """ 165 hostname = dbmodels.CharField(max_length=255, unique=True) 166 167 name_field = 'hostname' 168 objects = model_logic.ExtendedManager() 169 170 171 def save(self, *args, **kwargs): 172 if not User.current_user().is_superuser(): 173 raise Exception('Only superusers may edit drones') 174 super(Drone, self).save(*args, **kwargs) 175 176 177 def delete(self): 178 if not User.current_user().is_superuser(): 179 raise Exception('Only superusers may delete drones') 180 super(Drone, self).delete() 181 182 183 class Meta: 184 """Metadata for class Drone.""" 185 db_table = 'afe_drones' 186 187 def __unicode__(self): 188 return unicode(self.hostname) 189 190 191class DroneSet(dbmodels.Model, model_logic.ModelExtensions): 192 """ 193 A set of scheduler drones 194 195 These will be used by the scheduler to decide what drones a job is allowed 196 to run on. 197 198 name: the drone set's name 199 drones: the drones that are part of the set 200 """ 201 DRONE_SETS_ENABLED = global_config.global_config.get_config_value( 202 'SCHEDULER', 'drone_sets_enabled', type=bool, default=False) 203 DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value( 204 'SCHEDULER', 'default_drone_set_name', default=None) 205 206 name = dbmodels.CharField(max_length=255, unique=True) 207 drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones') 208 209 name_field = 'name' 210 objects = model_logic.ExtendedManager() 211 212 213 def save(self, *args, **kwargs): 214 if not User.current_user().is_superuser(): 215 raise Exception('Only superusers may edit drone sets') 216 super(DroneSet, self).save(*args, **kwargs) 217 218 219 def delete(self): 220 if not User.current_user().is_superuser(): 221 raise Exception('Only superusers may delete drone sets') 222 super(DroneSet, self).delete() 223 224 225 @classmethod 226 def drone_sets_enabled(cls): 227 """Returns whether drone sets are enabled. 228 229 @param cls: Implicit class object. 230 """ 231 return cls.DRONE_SETS_ENABLED 232 233 234 @classmethod 235 def default_drone_set_name(cls): 236 """Returns the default drone set name. 237 238 @param cls: Implicit class object. 239 """ 240 return cls.DEFAULT_DRONE_SET_NAME 241 242 243 @classmethod 244 def get_default(cls): 245 """Gets the default drone set name, compatible with Job.add_object. 246 247 @param cls: Implicit class object. 248 """ 249 return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME) 250 251 252 @classmethod 253 def resolve_name(cls, drone_set_name): 254 """ 255 Returns the name of one of these, if not None, in order of preference: 256 1) the drone set given, 257 2) the current user's default drone set, or 258 3) the global default drone set 259 260 or returns None if drone sets are disabled 261 262 @param cls: Implicit class object. 263 @param drone_set_name: A drone set name. 264 """ 265 if not cls.drone_sets_enabled(): 266 return None 267 268 user = User.current_user() 269 user_drone_set_name = user.drone_set and user.drone_set.name 270 271 return drone_set_name or user_drone_set_name or cls.get_default().name 272 273 274 def get_drone_hostnames(self): 275 """ 276 Gets the hostnames of all drones in this drone set 277 """ 278 return set(self.drones.all().values_list('hostname', flat=True)) 279 280 281 class Meta: 282 """Metadata for class DroneSet.""" 283 db_table = 'afe_drone_sets' 284 285 def __unicode__(self): 286 return unicode(self.name) 287 288 289class User(dbmodels.Model, model_logic.ModelExtensions): 290 """\ 291 Required: 292 login :user login name 293 294 Optional: 295 access_level: 0=User (default), 1=Admin, 100=Root 296 """ 297 ACCESS_ROOT = 100 298 ACCESS_ADMIN = 1 299 ACCESS_USER = 0 300 301 AUTOTEST_SYSTEM = 'autotest_system' 302 303 login = dbmodels.CharField(max_length=255, unique=True) 304 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 305 306 # user preferences 307 reboot_before = dbmodels.SmallIntegerField( 308 choices=model_attributes.RebootBefore.choices(), blank=True, 309 default=DEFAULT_REBOOT_BEFORE) 310 reboot_after = dbmodels.SmallIntegerField( 311 choices=model_attributes.RebootAfter.choices(), blank=True, 312 default=DEFAULT_REBOOT_AFTER) 313 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 314 show_experimental = dbmodels.BooleanField(default=False) 315 316 name_field = 'login' 317 objects = model_logic.ExtendedManager() 318 319 320 def save(self, *args, **kwargs): 321 # is this a new object being saved for the first time? 322 first_time = (self.id is None) 323 user = thread_local.get_user() 324 if user and not user.is_superuser() and user.login != self.login: 325 raise AclAccessViolation("You cannot modify user " + self.login) 326 super(User, self).save(*args, **kwargs) 327 if first_time: 328 everyone = AclGroup.objects.get(name='Everyone') 329 everyone.users.add(self) 330 331 332 def is_superuser(self): 333 """Returns whether the user has superuser access.""" 334 return self.access_level >= self.ACCESS_ROOT 335 336 337 @classmethod 338 def current_user(cls): 339 """Returns the current user. 340 341 @param cls: Implicit class object. 342 """ 343 user = thread_local.get_user() 344 if user is None: 345 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 346 user.access_level = cls.ACCESS_ROOT 347 user.save() 348 return user 349 350 351 class Meta: 352 """Metadata for class User.""" 353 db_table = 'afe_users' 354 355 def __unicode__(self): 356 return unicode(self.login) 357 358 359class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel, 360 model_logic.ModelWithAttributes): 361 """\ 362 Required: 363 hostname 364 365 optional: 366 locked: if true, host is locked and will not be queued 367 368 Internal: 369 From AbstractHostModel: 370 synch_id: currently unused 371 status: string describing status of host 372 invalid: true if the host has been deleted 373 protection: indicates what can be done to this host during repair 374 lock_time: DateTime at which the host was locked 375 dirty: true if the host has been used without being rebooted 376 Local: 377 locked_by: user that locked the host, or null if the host is unlocked 378 """ 379 380 SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set', 381 'hostattribute_set', 382 'labels', 383 'shard']) 384 385 386 def custom_deserialize_relation(self, link, data): 387 assert link == 'shard', 'Link %s should not be deserialized' % link 388 self.shard = Shard.deserialize(data) 389 390 391 # Note: Only specify foreign keys here, specify all native host columns in 392 # rdb_model_extensions instead. 393 Protection = host_protections.Protection 394 labels = dbmodels.ManyToManyField(Label, blank=True, 395 db_table='afe_hosts_labels') 396 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 397 name_field = 'hostname' 398 objects = model_logic.ModelWithInvalidManager() 399 valid_objects = model_logic.ValidObjectsManager() 400 leased_objects = model_logic.LeasedHostManager() 401 402 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 403 404 def __init__(self, *args, **kwargs): 405 super(Host, self).__init__(*args, **kwargs) 406 self._record_attributes(['status']) 407 408 409 @staticmethod 410 def create_one_time_host(hostname): 411 """Creates a one-time host. 412 413 @param hostname: The name for the host. 414 """ 415 query = Host.objects.filter(hostname=hostname) 416 if query.count() == 0: 417 host = Host(hostname=hostname, invalid=True) 418 host.do_validate() 419 else: 420 host = query[0] 421 if not host.invalid: 422 raise model_logic.ValidationError({ 423 'hostname' : '%s already exists in the autotest DB. ' 424 'Select it rather than entering it as a one time ' 425 'host.' % hostname 426 }) 427 host.protection = host_protections.Protection.DO_NOT_REPAIR 428 host.locked = False 429 host.save() 430 host.clean_object() 431 return host 432 433 434 @classmethod 435 def assign_to_shard(cls, shard): 436 """Assigns hosts to a shard. 437 438 This function will check which labels are associated with the given 439 shard. It will assign those hosts, that have labels that are assigned 440 to the shard and haven't been returned to shard earlier. 441 442 @param shard: The shard object to assign labels/hosts for. 443 @returns the hosts objects that should be sent to the shard. 444 """ 445 446 # Disclaimer: concurrent heartbeats should theoretically not occur in 447 # the current setup. As they may be introduced in the near future, 448 # this comment will be left here. 449 450 # Sending stuff twice is acceptable, but forgetting something isn't. 451 # Detecting duplicates on the client is easy, but here it's harder. The 452 # following options were considered: 453 # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more 454 # than select returned, as concurrently more hosts might have been 455 # inserted 456 # - UPDATE and then SELECT WHERE shard=shard: select always returns all 457 # hosts for the shard, this is overhead 458 # - SELECT and then UPDATE only selected without requerying afterwards: 459 # returns the old state of the records. 460 host_ids = list(Host.objects.filter( 461 shard=None, 462 labels=shard.labels.all(), 463 leased=False 464 ).values_list('pk', flat=True)) 465 466 if host_ids: 467 Host.objects.filter(pk__in=host_ids).update(shard=shard) 468 return list(Host.objects.filter(pk__in=host_ids).all()) 469 return [] 470 471 def resurrect_object(self, old_object): 472 super(Host, self).resurrect_object(old_object) 473 # invalid hosts can be in use by the scheduler (as one-time hosts), so 474 # don't change the status 475 self.status = old_object.status 476 477 478 def clean_object(self): 479 self.aclgroup_set.clear() 480 self.labels.clear() 481 482 483 def save(self, *args, **kwargs): 484 # extra spaces in the hostname can be a sneaky source of errors 485 self.hostname = self.hostname.strip() 486 # is this a new object being saved for the first time? 487 first_time = (self.id is None) 488 if not first_time: 489 AclGroup.check_for_acl_violation_hosts([self]) 490 if self.locked and not self.locked_by: 491 self.locked_by = User.current_user() 492 self.lock_time = datetime.now() 493 self.dirty = True 494 elif not self.locked and self.locked_by: 495 self.locked_by = None 496 self.lock_time = None 497 super(Host, self).save(*args, **kwargs) 498 if first_time: 499 everyone = AclGroup.objects.get(name='Everyone') 500 everyone.hosts.add(self) 501 self._check_for_updated_attributes() 502 503 504 def delete(self): 505 AclGroup.check_for_acl_violation_hosts([self]) 506 for queue_entry in self.hostqueueentry_set.all(): 507 queue_entry.deleted = True 508 queue_entry.abort() 509 super(Host, self).delete() 510 511 512 def on_attribute_changed(self, attribute, old_value): 513 assert attribute == 'status' 514 logging.info(self.hostname + ' -> ' + self.status) 515 516 517 def enqueue_job(self, job, atomic_group=None, is_template=False): 518 """Enqueue a job on this host. 519 520 @param job: A job to enqueue. 521 @param atomic_group: The associated atomic group. 522 @param is_template: Whther the status should be "Template". 523 """ 524 queue_entry = HostQueueEntry.create(host=self, job=job, 525 is_template=is_template, 526 atomic_group=atomic_group) 527 # allow recovery of dead hosts from the frontend 528 if not self.active_queue_entry() and self.is_dead(): 529 self.status = Host.Status.READY 530 self.save() 531 queue_entry.save() 532 533 block = IneligibleHostQueue(job=job, host=self) 534 block.save() 535 536 537 def platform(self): 538 """The platform of the host.""" 539 # TODO(showard): slighly hacky? 540 platforms = self.labels.filter(platform=True) 541 if len(platforms) == 0: 542 return None 543 return platforms[0] 544 platform.short_description = 'Platform' 545 546 547 @classmethod 548 def check_no_platform(cls, hosts): 549 """Verify the specified hosts have no associated platforms. 550 551 @param cls: Implicit class object. 552 @param hosts: The hosts to verify. 553 @raises model_logic.ValidationError if any hosts already have a 554 platform. 555 """ 556 Host.objects.populate_relationships(hosts, Label, 'label_list') 557 errors = [] 558 for host in hosts: 559 platforms = [label.name for label in host.label_list 560 if label.platform] 561 if platforms: 562 # do a join, just in case this host has multiple platforms, 563 # we'll be able to see it 564 errors.append('Host %s already has a platform: %s' % ( 565 host.hostname, ', '.join(platforms))) 566 if errors: 567 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 568 569 570 def is_dead(self): 571 """Returns whether the host is dead (has status repair failed).""" 572 return self.status == Host.Status.REPAIR_FAILED 573 574 575 def active_queue_entry(self): 576 """Returns the active queue entry for this host, or None if none.""" 577 active = list(self.hostqueueentry_set.filter(active=True)) 578 if not active: 579 return None 580 assert len(active) == 1, ('More than one active entry for ' 581 'host ' + self.hostname) 582 return active[0] 583 584 585 def _get_attribute_model_and_args(self, attribute): 586 return HostAttribute, dict(host=self, attribute=attribute) 587 588 589 class Meta: 590 """Metadata for the Host class.""" 591 db_table = 'afe_hosts' 592 593 def __unicode__(self): 594 return unicode(self.hostname) 595 596 597class HostAttribute(dbmodels.Model): 598 """Arbitrary keyvals associated with hosts.""" 599 host = dbmodels.ForeignKey(Host) 600 attribute = dbmodels.CharField(max_length=90) 601 value = dbmodels.CharField(max_length=300) 602 603 objects = model_logic.ExtendedManager() 604 605 class Meta: 606 """Metadata for the HostAttribute class.""" 607 db_table = 'afe_host_attributes' 608 609 610class Test(dbmodels.Model, model_logic.ModelExtensions): 611 """\ 612 Required: 613 author: author name 614 description: description of the test 615 name: test name 616 time: short, medium, long 617 test_class: This describes the class for your the test belongs in. 618 test_category: This describes the category for your tests 619 test_type: Client or Server 620 path: path to pass to run_test() 621 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 622 async job. If it's >1 it's sync job for that number of machines 623 i.e. if sync_count = 2 it is a sync job that requires two 624 machines. 625 Optional: 626 dependencies: What the test requires to run. Comma deliminated list 627 dependency_labels: many-to-many relationship with labels corresponding to 628 test dependencies. 629 experimental: If this is set to True production servers will ignore the test 630 run_verify: Whether or not the scheduler should run the verify stage 631 run_reset: Whether or not the scheduler should run the reset stage 632 test_retry: Number of times to retry test if the test did not complete 633 successfully. (optional, default: 0) 634 """ 635 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 636 637 name = dbmodels.CharField(max_length=255, unique=True) 638 author = dbmodels.CharField(max_length=255) 639 test_class = dbmodels.CharField(max_length=255) 640 test_category = dbmodels.CharField(max_length=255) 641 dependencies = dbmodels.CharField(max_length=255, blank=True) 642 description = dbmodels.TextField(blank=True) 643 experimental = dbmodels.BooleanField(default=True) 644 run_verify = dbmodels.BooleanField(default=False) 645 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 646 default=TestTime.MEDIUM) 647 test_type = dbmodels.SmallIntegerField( 648 choices=control_data.CONTROL_TYPE.choices()) 649 sync_count = dbmodels.IntegerField(default=1) 650 path = dbmodels.CharField(max_length=255, unique=True) 651 test_retry = dbmodels.IntegerField(blank=True, default=0) 652 run_reset = dbmodels.BooleanField(default=True) 653 654 dependency_labels = ( 655 dbmodels.ManyToManyField(Label, blank=True, 656 db_table='afe_autotests_dependency_labels')) 657 name_field = 'name' 658 objects = model_logic.ExtendedManager() 659 660 661 def admin_description(self): 662 """Returns a string representing the admin description.""" 663 escaped_description = saxutils.escape(self.description) 664 return '<span style="white-space:pre">%s</span>' % escaped_description 665 admin_description.allow_tags = True 666 admin_description.short_description = 'Description' 667 668 669 class Meta: 670 """Metadata for class Test.""" 671 db_table = 'afe_autotests' 672 673 def __unicode__(self): 674 return unicode(self.name) 675 676 677class TestParameter(dbmodels.Model): 678 """ 679 A declared parameter of a test 680 """ 681 test = dbmodels.ForeignKey(Test) 682 name = dbmodels.CharField(max_length=255) 683 684 class Meta: 685 """Metadata for class TestParameter.""" 686 db_table = 'afe_test_parameters' 687 unique_together = ('test', 'name') 688 689 def __unicode__(self): 690 return u'%s (%s)' % (self.name, self.test.name) 691 692 693class Profiler(dbmodels.Model, model_logic.ModelExtensions): 694 """\ 695 Required: 696 name: profiler name 697 test_type: Client or Server 698 699 Optional: 700 description: arbirary text description 701 """ 702 name = dbmodels.CharField(max_length=255, unique=True) 703 description = dbmodels.TextField(blank=True) 704 705 name_field = 'name' 706 objects = model_logic.ExtendedManager() 707 708 709 class Meta: 710 """Metadata for class Profiler.""" 711 db_table = 'afe_profilers' 712 713 def __unicode__(self): 714 return unicode(self.name) 715 716 717class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 718 """\ 719 Required: 720 name: name of ACL group 721 722 Optional: 723 description: arbitrary description of group 724 """ 725 726 SERIALIZATION_LINKS_TO_FOLLOW = set(['users']) 727 728 name = dbmodels.CharField(max_length=255, unique=True) 729 description = dbmodels.CharField(max_length=255, blank=True) 730 users = dbmodels.ManyToManyField(User, blank=False, 731 db_table='afe_acl_groups_users') 732 hosts = dbmodels.ManyToManyField(Host, blank=True, 733 db_table='afe_acl_groups_hosts') 734 735 name_field = 'name' 736 objects = model_logic.ExtendedManager() 737 738 @staticmethod 739 def check_for_acl_violation_hosts(hosts): 740 """Verify the current user has access to the specified hosts. 741 742 @param hosts: The hosts to verify against. 743 @raises AclAccessViolation if the current user doesn't have access 744 to a host. 745 """ 746 user = User.current_user() 747 if user.is_superuser(): 748 return 749 accessible_host_ids = set( 750 host.id for host in Host.objects.filter(aclgroup__users=user)) 751 for host in hosts: 752 # Check if the user has access to this host, 753 # but only if it is not a metahost or a one-time-host. 754 no_access = (isinstance(host, Host) 755 and not host.invalid 756 and int(host.id) not in accessible_host_ids) 757 if no_access: 758 raise AclAccessViolation("%s does not have access to %s" % 759 (str(user), str(host))) 760 761 762 @staticmethod 763 def check_abort_permissions(queue_entries): 764 """Look for queue entries that aren't abortable by the current user. 765 766 An entry is not abortable if: 767 * the job isn't owned by this user, and 768 * the machine isn't ACL-accessible, or 769 * the machine is in the "Everyone" ACL 770 771 @param queue_entries: The queue entries to check. 772 @raises AclAccessViolation if a queue entry is not abortable by the 773 current user. 774 """ 775 user = User.current_user() 776 if user.is_superuser(): 777 return 778 not_owned = queue_entries.exclude(job__owner=user.login) 779 # I do this using ID sets instead of just Django filters because 780 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 781 # 1.0. 782 # TODO: Use Django filters, now that we're using 1.0. 783 accessible_ids = set( 784 entry.id for entry 785 in not_owned.filter(host__aclgroup__users__login=user.login)) 786 public_ids = set(entry.id for entry 787 in not_owned.filter(host__aclgroup__name='Everyone')) 788 cannot_abort = [entry for entry in not_owned.select_related() 789 if entry.id not in accessible_ids 790 or entry.id in public_ids] 791 if len(cannot_abort) == 0: 792 return 793 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 794 entry.host_or_metahost_name()) 795 for entry in cannot_abort) 796 raise AclAccessViolation('You cannot abort the following job entries: ' 797 + entry_names) 798 799 800 def check_for_acl_violation_acl_group(self): 801 """Verifies the current user has acces to this ACL group. 802 803 @raises AclAccessViolation if the current user doesn't have access to 804 this ACL group. 805 """ 806 user = User.current_user() 807 if user.is_superuser(): 808 return 809 if self.name == 'Everyone': 810 raise AclAccessViolation("You cannot modify 'Everyone'!") 811 if not user in self.users.all(): 812 raise AclAccessViolation("You do not have access to %s" 813 % self.name) 814 815 @staticmethod 816 def on_host_membership_change(): 817 """Invoked when host membership changes.""" 818 everyone = AclGroup.objects.get(name='Everyone') 819 820 # find hosts that aren't in any ACL group and add them to Everyone 821 # TODO(showard): this is a bit of a hack, since the fact that this query 822 # works is kind of a coincidence of Django internals. This trick 823 # doesn't work in general (on all foreign key relationships). I'll 824 # replace it with a better technique when the need arises. 825 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 826 everyone.hosts.add(*orphaned_hosts.distinct()) 827 828 # find hosts in both Everyone and another ACL group, and remove them 829 # from Everyone 830 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 831 acled_hosts = set() 832 for host in hosts_in_everyone: 833 # Has an ACL group other than Everyone 834 if host.aclgroup_set.count() > 1: 835 acled_hosts.add(host) 836 everyone.hosts.remove(*acled_hosts) 837 838 839 def delete(self): 840 if (self.name == 'Everyone'): 841 raise AclAccessViolation("You cannot delete 'Everyone'!") 842 self.check_for_acl_violation_acl_group() 843 super(AclGroup, self).delete() 844 self.on_host_membership_change() 845 846 847 def add_current_user_if_empty(self): 848 """Adds the current user if the set of users is empty.""" 849 if not self.users.count(): 850 self.users.add(User.current_user()) 851 852 853 def perform_after_save(self, change): 854 """Called after a save. 855 856 @param change: Whether there was a change. 857 """ 858 if not change: 859 self.users.add(User.current_user()) 860 self.add_current_user_if_empty() 861 self.on_host_membership_change() 862 863 864 def save(self, *args, **kwargs): 865 change = bool(self.id) 866 if change: 867 # Check the original object for an ACL violation 868 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 869 super(AclGroup, self).save(*args, **kwargs) 870 self.perform_after_save(change) 871 872 873 class Meta: 874 """Metadata for class AclGroup.""" 875 db_table = 'afe_acl_groups' 876 877 def __unicode__(self): 878 return unicode(self.name) 879 880 881class Kernel(dbmodels.Model): 882 """ 883 A kernel configuration for a parameterized job 884 """ 885 version = dbmodels.CharField(max_length=255) 886 cmdline = dbmodels.CharField(max_length=255, blank=True) 887 888 @classmethod 889 def create_kernels(cls, kernel_list): 890 """Creates all kernels in the kernel list. 891 892 @param cls: Implicit class object. 893 @param kernel_list: A list of dictionaries that describe the kernels, 894 in the same format as the 'kernel' argument to 895 rpc_interface.generate_control_file. 896 @return A list of the created kernels. 897 """ 898 if not kernel_list: 899 return None 900 return [cls._create(kernel) for kernel in kernel_list] 901 902 903 @classmethod 904 def _create(cls, kernel_dict): 905 version = kernel_dict.pop('version') 906 cmdline = kernel_dict.pop('cmdline', '') 907 908 if kernel_dict: 909 raise Exception('Extraneous kernel arguments remain: %r' 910 % kernel_dict) 911 912 kernel, _ = cls.objects.get_or_create(version=version, 913 cmdline=cmdline) 914 return kernel 915 916 917 class Meta: 918 """Metadata for class Kernel.""" 919 db_table = 'afe_kernels' 920 unique_together = ('version', 'cmdline') 921 922 def __unicode__(self): 923 return u'%s %s' % (self.version, self.cmdline) 924 925 926class ParameterizedJob(dbmodels.Model): 927 """ 928 Auxiliary configuration for a parameterized job. 929 """ 930 test = dbmodels.ForeignKey(Test) 931 label = dbmodels.ForeignKey(Label, null=True) 932 use_container = dbmodels.BooleanField(default=False) 933 profile_only = dbmodels.BooleanField(default=False) 934 upload_kernel_config = dbmodels.BooleanField(default=False) 935 936 kernels = dbmodels.ManyToManyField( 937 Kernel, db_table='afe_parameterized_job_kernels') 938 profilers = dbmodels.ManyToManyField( 939 Profiler, through='ParameterizedJobProfiler') 940 941 942 @classmethod 943 def smart_get(cls, id_or_name, *args, **kwargs): 944 """For compatibility with Job.add_object. 945 946 @param cls: Implicit class object. 947 @param id_or_name: The ID or name to get. 948 @param args: Non-keyword arguments. 949 @param kwargs: Keyword arguments. 950 """ 951 return cls.objects.get(pk=id_or_name) 952 953 954 def job(self): 955 """Returns the job if it exists, or else None.""" 956 jobs = self.job_set.all() 957 assert jobs.count() <= 1 958 return jobs and jobs[0] or None 959 960 961 class Meta: 962 """Metadata for class ParameterizedJob.""" 963 db_table = 'afe_parameterized_jobs' 964 965 def __unicode__(self): 966 return u'%s (parameterized) - %s' % (self.test.name, self.job()) 967 968 969class ParameterizedJobProfiler(dbmodels.Model): 970 """ 971 A profiler to run on a parameterized job 972 """ 973 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 974 profiler = dbmodels.ForeignKey(Profiler) 975 976 class Meta: 977 """Metedata for class ParameterizedJobProfiler.""" 978 db_table = 'afe_parameterized_jobs_profilers' 979 unique_together = ('parameterized_job', 'profiler') 980 981 982class ParameterizedJobProfilerParameter(dbmodels.Model): 983 """ 984 A parameter for a profiler in a parameterized job 985 """ 986 parameterized_job_profiler = dbmodels.ForeignKey(ParameterizedJobProfiler) 987 parameter_name = dbmodels.CharField(max_length=255) 988 parameter_value = dbmodels.TextField() 989 parameter_type = dbmodels.CharField( 990 max_length=8, choices=model_attributes.ParameterTypes.choices()) 991 992 class Meta: 993 """Metadata for class ParameterizedJobProfilerParameter.""" 994 db_table = 'afe_parameterized_job_profiler_parameters' 995 unique_together = ('parameterized_job_profiler', 'parameter_name') 996 997 def __unicode__(self): 998 return u'%s - %s' % (self.parameterized_job_profiler.profiler.name, 999 self.parameter_name) 1000 1001 1002class ParameterizedJobParameter(dbmodels.Model): 1003 """ 1004 Parameters for a parameterized job 1005 """ 1006 parameterized_job = dbmodels.ForeignKey(ParameterizedJob) 1007 test_parameter = dbmodels.ForeignKey(TestParameter) 1008 parameter_value = dbmodels.TextField() 1009 parameter_type = dbmodels.CharField( 1010 max_length=8, choices=model_attributes.ParameterTypes.choices()) 1011 1012 class Meta: 1013 """Metadata for class ParameterizedJobParameter.""" 1014 db_table = 'afe_parameterized_job_parameters' 1015 unique_together = ('parameterized_job', 'test_parameter') 1016 1017 def __unicode__(self): 1018 return u'%s - %s' % (self.parameterized_job.job().name, 1019 self.test_parameter.name) 1020 1021 1022class JobManager(model_logic.ExtendedManager): 1023 'Custom manager to provide efficient status counts querying.' 1024 def get_status_counts(self, job_ids): 1025 """Returns a dict mapping the given job IDs to their status count dicts. 1026 1027 @param job_ids: A list of job IDs. 1028 """ 1029 if not job_ids: 1030 return {} 1031 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 1032 cursor = connection.cursor() 1033 cursor.execute(""" 1034 SELECT job_id, status, aborted, complete, COUNT(*) 1035 FROM afe_host_queue_entries 1036 WHERE job_id IN %s 1037 GROUP BY job_id, status, aborted, complete 1038 """ % id_list) 1039 all_job_counts = dict((job_id, {}) for job_id in job_ids) 1040 for job_id, status, aborted, complete, count in cursor.fetchall(): 1041 job_dict = all_job_counts[job_id] 1042 full_status = HostQueueEntry.compute_full_status(status, aborted, 1043 complete) 1044 job_dict.setdefault(full_status, 0) 1045 job_dict[full_status] += count 1046 return all_job_counts 1047 1048 1049class Job(dbmodels.Model, model_logic.ModelExtensions): 1050 """\ 1051 owner: username of job owner 1052 name: job name (does not have to be unique) 1053 priority: Integer priority value. Higher is more important. 1054 control_file: contents of control file 1055 control_type: Client or Server 1056 created_on: date of job creation 1057 submitted_on: date of job submission 1058 synch_count: how many hosts should be used per autoserv execution 1059 run_verify: Whether or not to run the verify phase 1060 run_reset: Whether or not to run the reset phase 1061 timeout: DEPRECATED - hours from queuing time until job times out 1062 timeout_mins: minutes from job queuing time until the job times out 1063 max_runtime_hrs: DEPRECATED - hours from job starting time until job 1064 times out 1065 max_runtime_mins: minutes from job starting time until job times out 1066 email_list: list of people to email on completion delimited by any of: 1067 white space, ',', ':', ';' 1068 dependency_labels: many-to-many relationship with labels corresponding to 1069 job dependencies 1070 reboot_before: Never, If dirty, or Always 1071 reboot_after: Never, If all tests passed, or Always 1072 parse_failed_repair: if True, a failed repair launched by this job will have 1073 its results parsed as part of the job. 1074 drone_set: The set of drones to run this job on 1075 parent_job: Parent job (optional) 1076 test_retry: Number of times to retry test if the test did not complete 1077 successfully. (optional, default: 0) 1078 """ 1079 1080 # TODO: Investigate, if jobkeyval_set is really needed. 1081 # dynamic_suite will write them into an attached file for the drone, but 1082 # it doesn't seem like they are actually used. If they aren't used, remove 1083 # jobkeyval_set here. 1084 SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels', 1085 'hostqueueentry_set', 1086 'jobkeyval_set', 1087 'shard']) 1088 1089 1090 def _deserialize_relation(self, link, data): 1091 if link in ['hostqueueentry_set', 'jobkeyval_set']: 1092 for obj in data: 1093 obj['job_id'] = self.id 1094 1095 super(Job, self)._deserialize_relation(link, data) 1096 1097 1098 def custom_deserialize_relation(self, link, data): 1099 assert link == 'shard', 'Link %s should not be deserialized' % link 1100 self.shard = Shard.deserialize(data) 1101 1102 1103 def sanity_check_update_from_shard(self, shard, updated_serialized): 1104 if self.shard_id != shard.id: 1105 raise error.UnallowedRecordsSentToMaster( 1106 'Job id=%s is assigned to shard (%s). Cannot update it with %s ' 1107 'from shard %s.' % (self.id, self.shard_id, updated_serialized, 1108 shard.id)) 1109 1110 1111 # TIMEOUT is deprecated. 1112 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 1113 'AUTOTEST_WEB', 'job_timeout_default', default=24) 1114 DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value( 1115 'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60) 1116 # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is 1117 # completed. 1118 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 1119 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 1120 DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value( 1121 'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60) 1122 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 1123 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 1124 default=False) 1125 1126 owner = dbmodels.CharField(max_length=255) 1127 name = dbmodels.CharField(max_length=255) 1128 priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT) 1129 control_file = dbmodels.TextField(null=True, blank=True) 1130 control_type = dbmodels.SmallIntegerField( 1131 choices=control_data.CONTROL_TYPE.choices(), 1132 blank=True, # to allow 0 1133 default=control_data.CONTROL_TYPE.CLIENT) 1134 created_on = dbmodels.DateTimeField() 1135 synch_count = dbmodels.IntegerField(blank=True, default=0) 1136 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 1137 run_verify = dbmodels.BooleanField(default=False) 1138 email_list = dbmodels.CharField(max_length=250, blank=True) 1139 dependency_labels = ( 1140 dbmodels.ManyToManyField(Label, blank=True, 1141 db_table='afe_jobs_dependency_labels')) 1142 reboot_before = dbmodels.SmallIntegerField( 1143 choices=model_attributes.RebootBefore.choices(), blank=True, 1144 default=DEFAULT_REBOOT_BEFORE) 1145 reboot_after = dbmodels.SmallIntegerField( 1146 choices=model_attributes.RebootAfter.choices(), blank=True, 1147 default=DEFAULT_REBOOT_AFTER) 1148 parse_failed_repair = dbmodels.BooleanField( 1149 default=DEFAULT_PARSE_FAILED_REPAIR) 1150 # max_runtime_hrs is deprecated. Will be removed after switch to mins is 1151 # completed. 1152 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 1153 max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS) 1154 drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True) 1155 1156 parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True, 1157 blank=True) 1158 1159 parent_job = dbmodels.ForeignKey('self', blank=True, null=True) 1160 1161 test_retry = dbmodels.IntegerField(blank=True, default=0) 1162 1163 run_reset = dbmodels.BooleanField(default=True) 1164 1165 timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS) 1166 1167 # If this is None on the master, a slave should be found. 1168 # If this is None on a slave, it should be synced back to the master 1169 shard = dbmodels.ForeignKey(Shard, blank=True, null=True) 1170 1171 # custom manager 1172 objects = JobManager() 1173 1174 1175 @decorators.cached_property 1176 def labels(self): 1177 """All the labels of this job""" 1178 # We need to convert dependency_labels to a list, because all() gives us 1179 # back an iterator, and storing/caching an iterator means we'd only be 1180 # able to read from it once. 1181 return list(self.dependency_labels.all()) 1182 1183 1184 def is_server_job(self): 1185 """Returns whether this job is of type server.""" 1186 return self.control_type == control_data.CONTROL_TYPE.SERVER 1187 1188 1189 @classmethod 1190 def parameterized_jobs_enabled(cls): 1191 """Returns whether parameterized jobs are enabled. 1192 1193 @param cls: Implicit class object. 1194 """ 1195 return global_config.global_config.get_config_value( 1196 'AUTOTEST_WEB', 'parameterized_jobs', type=bool) 1197 1198 1199 @classmethod 1200 def check_parameterized_job(cls, control_file, parameterized_job): 1201 """Checks that the job is valid given the global config settings. 1202 1203 First, either control_file must be set, or parameterized_job must be 1204 set, but not both. Second, parameterized_job must be set if and only if 1205 the parameterized_jobs option in the global config is set to True. 1206 1207 @param cls: Implict class object. 1208 @param control_file: A control file. 1209 @param parameterized_job: A parameterized job. 1210 """ 1211 if not (bool(control_file) ^ bool(parameterized_job)): 1212 raise Exception('Job must have either control file or ' 1213 'parameterization, but not both') 1214 1215 parameterized_jobs_enabled = cls.parameterized_jobs_enabled() 1216 if control_file and parameterized_jobs_enabled: 1217 raise Exception('Control file specified, but parameterized jobs ' 1218 'are enabled') 1219 if parameterized_job and not parameterized_jobs_enabled: 1220 raise Exception('Parameterized job specified, but parameterized ' 1221 'jobs are not enabled') 1222 1223 1224 @classmethod 1225 def create(cls, owner, options, hosts): 1226 """Creates a job. 1227 1228 The job is created by taking some information (the listed args) and 1229 filling in the rest of the necessary information. 1230 1231 @param cls: Implicit class object. 1232 @param owner: The owner for the job. 1233 @param options: An options object. 1234 @param hosts: The hosts to use. 1235 """ 1236 AclGroup.check_for_acl_violation_hosts(hosts) 1237 1238 control_file = options.get('control_file') 1239 parameterized_job = options.get('parameterized_job') 1240 1241 # The current implementation of parameterized jobs requires that only 1242 # control files or parameterized jobs are used. Using the image 1243 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1244 # parameterized jobs and control files jobs, it does muck enough with 1245 # normal jobs by adding a parameterized id to them that this check will 1246 # fail. So for now we just skip this check. 1247 # cls.check_parameterized_job(control_file=control_file, 1248 # parameterized_job=parameterized_job) 1249 user = User.current_user() 1250 if options.get('reboot_before') is None: 1251 options['reboot_before'] = user.get_reboot_before_display() 1252 if options.get('reboot_after') is None: 1253 options['reboot_after'] = user.get_reboot_after_display() 1254 1255 drone_set = DroneSet.resolve_name(options.get('drone_set')) 1256 1257 if options.get('timeout_mins') is None and options.get('timeout'): 1258 options['timeout_mins'] = options['timeout'] * 60 1259 1260 job = cls.add_object( 1261 owner=owner, 1262 name=options['name'], 1263 priority=options['priority'], 1264 control_file=control_file, 1265 control_type=options['control_type'], 1266 synch_count=options.get('synch_count'), 1267 # timeout needs to be deleted in the future. 1268 timeout=options.get('timeout'), 1269 timeout_mins=options.get('timeout_mins'), 1270 max_runtime_mins=options.get('max_runtime_mins'), 1271 run_verify=options.get('run_verify'), 1272 email_list=options.get('email_list'), 1273 reboot_before=options.get('reboot_before'), 1274 reboot_after=options.get('reboot_after'), 1275 parse_failed_repair=options.get('parse_failed_repair'), 1276 created_on=datetime.now(), 1277 drone_set=drone_set, 1278 parameterized_job=parameterized_job, 1279 parent_job=options.get('parent_job_id'), 1280 test_retry=options.get('test_retry'), 1281 run_reset=options.get('run_reset')) 1282 1283 job.dependency_labels = options['dependencies'] 1284 1285 if options.get('keyvals'): 1286 for key, value in options['keyvals'].iteritems(): 1287 JobKeyval.objects.create(job=job, key=key, value=value) 1288 1289 return job 1290 1291 1292 @classmethod 1293 def assign_to_shard(cls, shard): 1294 """Assigns unassigned jobs to a shard. 1295 1296 All jobs that have the platform label that was assigned to the given 1297 shard are assigned to the shard and returned. 1298 @param shard: The shard to assign jobs to. 1299 @returns The job objects that should be sent to the shard. 1300 """ 1301 # Disclaimer: Concurrent heartbeats should not occur in today's setup. 1302 # If this changes or they are triggered manually, this applies: 1303 # Jobs may be returned more than once by concurrent calls of this 1304 # function, as there is a race condition between SELECT and UPDATE. 1305 job_ids = list(Job.objects.filter( 1306 shard=None, 1307 dependency_labels=shard.labels.all() 1308 ).exclude( 1309 hostqueueentry__complete=True 1310 ).exclude( 1311 hostqueueentry__active=True 1312 ).values_list('pk', flat=True)) 1313 if job_ids: 1314 Job.objects.filter(pk__in=job_ids).update(shard=shard) 1315 return list(Job.objects.filter(pk__in=job_ids).all()) 1316 return [] 1317 1318 1319 def save(self, *args, **kwargs): 1320 # The current implementation of parameterized jobs requires that only 1321 # control files or parameterized jobs are used. Using the image 1322 # parameter on autoupdate_ParameterizedJob doesn't mix pure 1323 # parameterized jobs and control files jobs, it does muck enough with 1324 # normal jobs by adding a parameterized id to them that this check will 1325 # fail. So for now we just skip this check. 1326 # cls.check_parameterized_job(control_file=self.control_file, 1327 # parameterized_job=self.parameterized_job) 1328 super(Job, self).save(*args, **kwargs) 1329 1330 1331 def queue(self, hosts, atomic_group=None, is_template=False): 1332 """Enqueue a job on the given hosts. 1333 1334 @param hosts: The hosts to use. 1335 @param atomic_group: The associated atomic group. 1336 @param is_template: Whether the status should be "Template". 1337 """ 1338 if not hosts: 1339 if atomic_group: 1340 # No hosts or labels are required to queue an atomic group 1341 # Job. However, if they are given, we respect them below. 1342 atomic_group.enqueue_job(self, is_template=is_template) 1343 else: 1344 # hostless job 1345 entry = HostQueueEntry.create(job=self, is_template=is_template) 1346 entry.save() 1347 return 1348 1349 for host in hosts: 1350 host.enqueue_job(self, atomic_group=atomic_group, 1351 is_template=is_template) 1352 1353 1354 def create_recurring_job(self, start_date, loop_period, loop_count, owner): 1355 """Creates a recurring job. 1356 1357 @param start_date: The starting date of the job. 1358 @param loop_period: How often to re-run the job, in seconds. 1359 @param loop_count: The re-run count. 1360 @param owner: The owner of the job. 1361 """ 1362 rec = RecurringRun(job=self, start_date=start_date, 1363 loop_period=loop_period, 1364 loop_count=loop_count, 1365 owner=User.objects.get(login=owner)) 1366 rec.save() 1367 return rec.id 1368 1369 1370 def user(self): 1371 """Gets the user of this job, or None if it doesn't exist.""" 1372 try: 1373 return User.objects.get(login=self.owner) 1374 except self.DoesNotExist: 1375 return None 1376 1377 1378 def abort(self): 1379 """Aborts this job.""" 1380 for queue_entry in self.hostqueueentry_set.all(): 1381 queue_entry.abort() 1382 1383 1384 def tag(self): 1385 """Returns a string tag for this job.""" 1386 return '%s-%s' % (self.id, self.owner) 1387 1388 1389 def keyval_dict(self): 1390 """Returns all keyvals for this job as a dictionary.""" 1391 return dict((keyval.key, keyval.value) 1392 for keyval in self.jobkeyval_set.all()) 1393 1394 1395 class Meta: 1396 """Metadata for class Job.""" 1397 db_table = 'afe_jobs' 1398 1399 def __unicode__(self): 1400 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 1401 1402 1403class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 1404 """Keyvals associated with jobs""" 1405 job = dbmodels.ForeignKey(Job) 1406 key = dbmodels.CharField(max_length=90) 1407 value = dbmodels.CharField(max_length=300) 1408 1409 objects = model_logic.ExtendedManager() 1410 1411 class Meta: 1412 """Metadata for class JobKeyval.""" 1413 db_table = 'afe_job_keyvals' 1414 1415 1416class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 1417 """Represents an ineligible host queue.""" 1418 job = dbmodels.ForeignKey(Job) 1419 host = dbmodels.ForeignKey(Host) 1420 1421 objects = model_logic.ExtendedManager() 1422 1423 class Meta: 1424 """Metadata for class IneligibleHostQueue.""" 1425 db_table = 'afe_ineligible_host_queues' 1426 1427 1428class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1429 """Represents a host queue entry.""" 1430 1431 SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host']) 1432 1433 1434 def custom_deserialize_relation(self, link, data): 1435 assert link == 'meta_host' 1436 self.meta_host = Label.deserialize(data) 1437 1438 1439 def sanity_check_update_from_shard(self, shard, updated_serialized, 1440 job_ids_sent): 1441 if self.job_id not in job_ids_sent: 1442 raise error.UnallowedRecordsSentToMaster( 1443 'Sent HostQueueEntry without corresponding ' 1444 'job entry: %s' % updated_serialized) 1445 1446 1447 Status = host_queue_entry_states.Status 1448 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 1449 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 1450 1451 job = dbmodels.ForeignKey(Job) 1452 host = dbmodels.ForeignKey(Host, blank=True, null=True) 1453 status = dbmodels.CharField(max_length=255) 1454 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 1455 db_column='meta_host') 1456 active = dbmodels.BooleanField(default=False) 1457 complete = dbmodels.BooleanField(default=False) 1458 deleted = dbmodels.BooleanField(default=False) 1459 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 1460 default='') 1461 # If atomic_group is set, this is a virtual HostQueueEntry that will 1462 # be expanded into many actual hosts within the group at schedule time. 1463 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 1464 aborted = dbmodels.BooleanField(default=False) 1465 started_on = dbmodels.DateTimeField(null=True, blank=True) 1466 finished_on = dbmodels.DateTimeField(null=True, blank=True) 1467 1468 objects = model_logic.ExtendedManager() 1469 1470 1471 def __init__(self, *args, **kwargs): 1472 super(HostQueueEntry, self).__init__(*args, **kwargs) 1473 self._record_attributes(['status']) 1474 1475 1476 @classmethod 1477 def create(cls, job, host=None, meta_host=None, atomic_group=None, 1478 is_template=False): 1479 """Creates a new host queue entry. 1480 1481 @param cls: Implicit class object. 1482 @param job: The associated job. 1483 @param host: The associated host. 1484 @param meta_host: The associated meta host. 1485 @param atomic_group: The associated atomic group. 1486 @param is_template: Whether the status should be "Template". 1487 """ 1488 if is_template: 1489 status = cls.Status.TEMPLATE 1490 else: 1491 status = cls.Status.QUEUED 1492 1493 return cls(job=job, host=host, meta_host=meta_host, 1494 atomic_group=atomic_group, status=status) 1495 1496 1497 def save(self, *args, **kwargs): 1498 self._set_active_and_complete() 1499 super(HostQueueEntry, self).save(*args, **kwargs) 1500 self._check_for_updated_attributes() 1501 1502 1503 def execution_path(self): 1504 """ 1505 Path to this entry's results (relative to the base results directory). 1506 """ 1507 return os.path.join(self.job.tag(), self.execution_subdir) 1508 1509 1510 def host_or_metahost_name(self): 1511 """Returns the first non-None name found in priority order. 1512 1513 The priority order checked is: (1) host name; (2) meta host name; and 1514 (3) atomic group name. 1515 """ 1516 if self.host: 1517 return self.host.hostname 1518 elif self.meta_host: 1519 return self.meta_host.name 1520 else: 1521 assert self.atomic_group, "no host, meta_host or atomic group!" 1522 return self.atomic_group.name 1523 1524 1525 def _set_active_and_complete(self): 1526 if self.status in self.ACTIVE_STATUSES: 1527 self.active, self.complete = True, False 1528 elif self.status in self.COMPLETE_STATUSES: 1529 self.active, self.complete = False, True 1530 else: 1531 self.active, self.complete = False, False 1532 1533 1534 def on_attribute_changed(self, attribute, old_value): 1535 assert attribute == 'status' 1536 logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id, 1537 self.status) 1538 1539 1540 def is_meta_host_entry(self): 1541 'True if this is a entry has a meta_host instead of a host.' 1542 return self.host is None and self.meta_host is not None 1543 1544 1545 # This code is shared between rpc_interface and models.HostQueueEntry. 1546 # Sadly due to circular imports between the 2 (crbug.com/230100) making it 1547 # a class method was the best way to refactor it. Attempting to put it in 1548 # rpc_utils or a new utils module failed as that would require us to import 1549 # models.py but to call it from here we would have to import the utils.py 1550 # thus creating a cycle. 1551 @classmethod 1552 def abort_host_queue_entries(cls, host_queue_entries): 1553 """Aborts a collection of host_queue_entries. 1554 1555 Abort these host queue entry and all host queue entries of jobs created 1556 by them. 1557 1558 @param host_queue_entries: List of host queue entries we want to abort. 1559 """ 1560 # This isn't completely immune to race conditions since it's not atomic, 1561 # but it should be safe given the scheduler's behavior. 1562 1563 # TODO(milleral): crbug.com/230100 1564 # The |abort_host_queue_entries| rpc does nearly exactly this, 1565 # however, trying to re-use the code generates some horrible 1566 # circular import error. I'd be nice to refactor things around 1567 # sometime so the code could be reused. 1568 1569 # Fixpoint algorithm to find the whole tree of HQEs to abort to 1570 # minimize the total number of database queries: 1571 children = set() 1572 new_children = set(host_queue_entries) 1573 while new_children: 1574 children.update(new_children) 1575 new_child_ids = [hqe.job_id for hqe in new_children] 1576 new_children = HostQueueEntry.objects.filter( 1577 job__parent_job__in=new_child_ids, 1578 complete=False, aborted=False).all() 1579 # To handle circular parental relationships 1580 new_children = set(new_children) - children 1581 1582 # Associate a user with the host queue entries that we're about 1583 # to abort so that we can look up who to blame for the aborts. 1584 now = datetime.now() 1585 user = User.current_user() 1586 aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe, 1587 aborted_by=user, aborted_on=now) for hqe in children] 1588 AbortedHostQueueEntry.objects.bulk_create(aborted_hqes) 1589 # Bulk update all of the HQEs to set the abort bit. 1590 child_ids = [hqe.id for hqe in children] 1591 HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True) 1592 1593 1594 def abort(self): 1595 """ Aborts this host queue entry. 1596 1597 Abort this host queue entry and all host queue entries of jobs created by 1598 this one. 1599 1600 """ 1601 if not self.complete and not self.aborted: 1602 HostQueueEntry.abort_host_queue_entries([self]) 1603 1604 1605 @classmethod 1606 def compute_full_status(cls, status, aborted, complete): 1607 """Returns a modified status msg if the host queue entry was aborted. 1608 1609 @param cls: Implicit class object. 1610 @param status: The original status message. 1611 @param aborted: Whether the host queue entry was aborted. 1612 @param complete: Whether the host queue entry was completed. 1613 """ 1614 if aborted and not complete: 1615 return 'Aborted (%s)' % status 1616 return status 1617 1618 1619 def full_status(self): 1620 """Returns the full status of this host queue entry, as a string.""" 1621 return self.compute_full_status(self.status, self.aborted, 1622 self.complete) 1623 1624 1625 def _postprocess_object_dict(self, object_dict): 1626 object_dict['full_status'] = self.full_status() 1627 1628 1629 class Meta: 1630 """Metadata for class HostQueueEntry.""" 1631 db_table = 'afe_host_queue_entries' 1632 1633 1634 1635 def __unicode__(self): 1636 hostname = None 1637 if self.host: 1638 hostname = self.host.hostname 1639 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 1640 1641 1642class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 1643 """Represents an aborted host queue entry.""" 1644 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 1645 aborted_by = dbmodels.ForeignKey(User) 1646 aborted_on = dbmodels.DateTimeField() 1647 1648 objects = model_logic.ExtendedManager() 1649 1650 1651 def save(self, *args, **kwargs): 1652 self.aborted_on = datetime.now() 1653 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 1654 1655 class Meta: 1656 """Metadata for class AbortedHostQueueEntry.""" 1657 db_table = 'afe_aborted_host_queue_entries' 1658 1659 1660class RecurringRun(dbmodels.Model, model_logic.ModelExtensions): 1661 """\ 1662 job: job to use as a template 1663 owner: owner of the instantiated template 1664 start_date: Run the job at scheduled date 1665 loop_period: Re-run (loop) the job periodically 1666 (in every loop_period seconds) 1667 loop_count: Re-run (loop) count 1668 """ 1669 1670 job = dbmodels.ForeignKey(Job) 1671 owner = dbmodels.ForeignKey(User) 1672 start_date = dbmodels.DateTimeField() 1673 loop_period = dbmodels.IntegerField(blank=True) 1674 loop_count = dbmodels.IntegerField(blank=True) 1675 1676 objects = model_logic.ExtendedManager() 1677 1678 class Meta: 1679 """Metadata for class RecurringRun.""" 1680 db_table = 'afe_recurring_run' 1681 1682 def __unicode__(self): 1683 return u'RecurringRun(job %s, start %s, period %s, count %s)' % ( 1684 self.job.id, self.start_date, self.loop_period, self.loop_count) 1685 1686 1687class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 1688 """\ 1689 Tasks to run on hosts at the next time they are in the Ready state. Use this 1690 for high-priority tasks, such as forced repair or forced reinstall. 1691 1692 host: host to run this task on 1693 task: special task to run 1694 time_requested: date and time the request for this task was made 1695 is_active: task is currently running 1696 is_complete: task has finished running 1697 is_aborted: task was aborted 1698 time_started: date and time the task started 1699 time_finished: date and time the task finished 1700 queue_entry: Host queue entry waiting on this task (or None, if task was not 1701 started in preparation of a job) 1702 """ 1703 Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision', 1704 string_values=True) 1705 1706 host = dbmodels.ForeignKey(Host, blank=False, null=False) 1707 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 1708 blank=False, null=False) 1709 requested_by = dbmodels.ForeignKey(User) 1710 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1711 null=False) 1712 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1713 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1714 is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False) 1715 time_started = dbmodels.DateTimeField(null=True, blank=True) 1716 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1717 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1718 time_finished = dbmodels.DateTimeField(null=True, blank=True) 1719 1720 objects = model_logic.ExtendedManager() 1721 1722 1723 def save(self, **kwargs): 1724 if self.queue_entry: 1725 self.requested_by = User.objects.get( 1726 login=self.queue_entry.job.owner) 1727 super(SpecialTask, self).save(**kwargs) 1728 1729 1730 def execution_path(self): 1731 """@see HostQueueEntry.execution_path()""" 1732 return 'hosts/%s/%s-%s' % (self.host.hostname, self.id, 1733 self.task.lower()) 1734 1735 1736 # property to emulate HostQueueEntry.status 1737 @property 1738 def status(self): 1739 """ 1740 Return a host queue entry status appropriate for this task. Although 1741 SpecialTasks are not HostQueueEntries, it is helpful to the user to 1742 present similar statuses. 1743 """ 1744 if self.is_complete: 1745 if self.success: 1746 return HostQueueEntry.Status.COMPLETED 1747 return HostQueueEntry.Status.FAILED 1748 if self.is_active: 1749 return HostQueueEntry.Status.RUNNING 1750 return HostQueueEntry.Status.QUEUED 1751 1752 1753 # property to emulate HostQueueEntry.started_on 1754 @property 1755 def started_on(self): 1756 """Returns the time at which this special task started.""" 1757 return self.time_started 1758 1759 1760 @classmethod 1761 def schedule_special_task(cls, host, task): 1762 """Schedules a special task on a host if not already scheduled. 1763 1764 @param cls: Implicit class object. 1765 @param host: The host to use. 1766 @param task: The task to schedule. 1767 """ 1768 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 1769 is_active=False, 1770 is_complete=False) 1771 if existing_tasks: 1772 return existing_tasks[0] 1773 1774 special_task = SpecialTask(host=host, task=task, 1775 requested_by=User.current_user()) 1776 special_task.save() 1777 return special_task 1778 1779 1780 def abort(self): 1781 """ Abort this special task.""" 1782 self.is_aborted = True 1783 self.save() 1784 1785 1786 def activate(self): 1787 """ 1788 Sets a task as active and sets the time started to the current time. 1789 """ 1790 logging.info('Starting: %s', self) 1791 self.is_active = True 1792 self.time_started = datetime.now() 1793 self.save() 1794 1795 1796 def finish(self, success): 1797 """Sets a task as completed. 1798 1799 @param success: Whether or not the task was successful. 1800 """ 1801 logging.info('Finished: %s', self) 1802 self.is_active = False 1803 self.is_complete = True 1804 self.success = success 1805 if self.time_started: 1806 self.time_finished = datetime.now() 1807 self.save() 1808 1809 1810 class Meta: 1811 """Metadata for class SpecialTask.""" 1812 db_table = 'afe_special_tasks' 1813 1814 1815 def __unicode__(self): 1816 result = u'Special Task %s (host %s, task %s, time %s)' % ( 1817 self.id, self.host, self.task, self.time_requested) 1818 if self.is_complete: 1819 result += u' (completed)' 1820 elif self.is_active: 1821 result += u' (active)' 1822 1823 return result 1824