models.py revision 121eee6338595777668e669c61521ceeceeeddf7
1import logging, os 2from datetime import datetime 3from django.db import models as dbmodels, connection 4from xml.sax import saxutils 5import common 6from autotest_lib.frontend.afe import model_logic, model_attributes 7from autotest_lib.frontend import settings, thread_local 8from autotest_lib.client.common_lib import enum, host_protections, global_config 9from autotest_lib.client.common_lib import host_queue_entry_states 10 11# job options and user preferences 12DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY 13DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.ALWAYS 14 15 16class AclAccessViolation(Exception): 17 """\ 18 Raised when an operation is attempted with proper permissions as 19 dictated by ACLs. 20 """ 21 22 23class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model): 24 """\ 25 An atomic group defines a collection of hosts which must only be scheduled 26 all at once. Any host with a label having an atomic group will only be 27 scheduled for a job at the same time as other hosts sharing that label. 28 29 Required: 30 name: A name for this atomic group. ex: 'rack23' or 'funky_net' 31 max_number_of_machines: The maximum number of machines that will be 32 scheduled at once when scheduling jobs to this atomic group. 33 The job.synch_count is considered the minimum. 34 35 Optional: 36 description: Arbitrary text description of this group's purpose. 37 """ 38 name = dbmodels.CharField(max_length=255, unique=True) 39 description = dbmodels.TextField(blank=True) 40 # This magic value is the default to simplify the scheduler logic. 41 # It must be "large". The common use of atomic groups is to want all 42 # machines in the group to be used, limits on which subset used are 43 # often chosen via dependency labels. 44 INFINITE_MACHINES = 333333333 45 max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES) 46 invalid = dbmodels.BooleanField(default=False, 47 editable=settings.FULL_ADMIN) 48 49 name_field = 'name' 50 objects = model_logic.ModelWithInvalidManager() 51 valid_objects = model_logic.ValidObjectsManager() 52 53 54 def enqueue_job(self, job, is_template=False): 55 """Enqueue a job on an associated atomic group of hosts.""" 56 queue_entry = HostQueueEntry.create(atomic_group=self, job=job, 57 is_template=is_template) 58 queue_entry.save() 59 60 61 def clean_object(self): 62 self.label_set.clear() 63 64 65 class Meta: 66 db_table = 'afe_atomic_groups' 67 68 69 def __unicode__(self): 70 return unicode(self.name) 71 72 73class Label(model_logic.ModelWithInvalid, dbmodels.Model): 74 """\ 75 Required: 76 name: label name 77 78 Optional: 79 kernel_config: URL/path to kernel config for jobs run on this label. 80 platform: If True, this is a platform label (defaults to False). 81 only_if_needed: If True, a Host with this label can only be used if that 82 label is requested by the job/test (either as the meta_host or 83 in the job_dependencies). 84 atomic_group: The atomic group associated with this label. 85 """ 86 name = dbmodels.CharField(max_length=255, unique=True) 87 kernel_config = dbmodels.CharField(max_length=255, blank=True) 88 platform = dbmodels.BooleanField(default=False) 89 invalid = dbmodels.BooleanField(default=False, 90 editable=settings.FULL_ADMIN) 91 only_if_needed = dbmodels.BooleanField(default=False) 92 93 name_field = 'name' 94 objects = model_logic.ModelWithInvalidManager() 95 valid_objects = model_logic.ValidObjectsManager() 96 atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True) 97 98 99 def clean_object(self): 100 self.host_set.clear() 101 self.test_set.clear() 102 103 104 def enqueue_job(self, job, atomic_group=None, is_template=False): 105 """Enqueue a job on any host of this label.""" 106 queue_entry = HostQueueEntry.create(meta_host=self, job=job, 107 is_template=is_template, 108 atomic_group=atomic_group) 109 queue_entry.save() 110 111 112 class Meta: 113 db_table = 'afe_labels' 114 115 def __unicode__(self): 116 return unicode(self.name) 117 118 119class User(dbmodels.Model, model_logic.ModelExtensions): 120 """\ 121 Required: 122 login :user login name 123 124 Optional: 125 access_level: 0=User (default), 1=Admin, 100=Root 126 """ 127 ACCESS_ROOT = 100 128 ACCESS_ADMIN = 1 129 ACCESS_USER = 0 130 131 AUTOTEST_SYSTEM = 'autotest_system' 132 133 login = dbmodels.CharField(max_length=255, unique=True) 134 access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True) 135 136 # user preferences 137 reboot_before = dbmodels.SmallIntegerField( 138 choices=model_attributes.RebootBefore.choices(), blank=True, 139 default=DEFAULT_REBOOT_BEFORE) 140 reboot_after = dbmodels.SmallIntegerField( 141 choices=model_attributes.RebootAfter.choices(), blank=True, 142 default=DEFAULT_REBOOT_AFTER) 143 show_experimental = dbmodels.BooleanField(default=False) 144 145 name_field = 'login' 146 objects = model_logic.ExtendedManager() 147 148 149 def save(self, *args, **kwargs): 150 # is this a new object being saved for the first time? 151 first_time = (self.id is None) 152 user = thread_local.get_user() 153 if user and not user.is_superuser() and user.login != self.login: 154 raise AclAccessViolation("You cannot modify user " + self.login) 155 super(User, self).save(*args, **kwargs) 156 if first_time: 157 everyone = AclGroup.objects.get(name='Everyone') 158 everyone.users.add(self) 159 160 161 def is_superuser(self): 162 return self.access_level >= self.ACCESS_ROOT 163 164 165 @classmethod 166 def current_user(cls): 167 user = thread_local.get_user() 168 if user is None: 169 user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM) 170 user.access_level = cls.ACCESS_ROOT 171 user.save() 172 return user 173 174 175 class Meta: 176 db_table = 'afe_users' 177 178 def __unicode__(self): 179 return unicode(self.login) 180 181 182class Host(model_logic.ModelWithInvalid, dbmodels.Model, 183 model_logic.ModelWithAttributes): 184 """\ 185 Required: 186 hostname 187 188 optional: 189 locked: if true, host is locked and will not be queued 190 191 Internal: 192 synch_id: currently unused 193 status: string describing status of host 194 invalid: true if the host has been deleted 195 protection: indicates what can be done to this host during repair 196 locked_by: user that locked the host, or null if the host is unlocked 197 lock_time: DateTime at which the host was locked 198 dirty: true if the host has been used without being rebooted 199 """ 200 Status = enum.Enum('Verifying', 'Running', 'Ready', 'Repairing', 201 'Repair Failed', 'Cleaning', 'Pending', 202 string_values=True) 203 Protection = host_protections.Protection 204 205 hostname = dbmodels.CharField(max_length=255, unique=True) 206 labels = dbmodels.ManyToManyField(Label, blank=True, 207 db_table='afe_hosts_labels') 208 locked = dbmodels.BooleanField(default=False) 209 synch_id = dbmodels.IntegerField(blank=True, null=True, 210 editable=settings.FULL_ADMIN) 211 status = dbmodels.CharField(max_length=255, default=Status.READY, 212 choices=Status.choices(), 213 editable=settings.FULL_ADMIN) 214 invalid = dbmodels.BooleanField(default=False, 215 editable=settings.FULL_ADMIN) 216 protection = dbmodels.SmallIntegerField(null=False, blank=True, 217 choices=host_protections.choices, 218 default=host_protections.default) 219 locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False) 220 lock_time = dbmodels.DateTimeField(null=True, blank=True, editable=False) 221 dirty = dbmodels.BooleanField(default=True, editable=settings.FULL_ADMIN) 222 223 name_field = 'hostname' 224 objects = model_logic.ModelWithInvalidManager() 225 valid_objects = model_logic.ValidObjectsManager() 226 227 228 def __init__(self, *args, **kwargs): 229 super(Host, self).__init__(*args, **kwargs) 230 self._record_attributes(['status']) 231 232 233 @staticmethod 234 def create_one_time_host(hostname): 235 query = Host.objects.filter(hostname=hostname) 236 if query.count() == 0: 237 host = Host(hostname=hostname, invalid=True) 238 host.do_validate() 239 else: 240 host = query[0] 241 if not host.invalid: 242 raise model_logic.ValidationError({ 243 'hostname' : '%s already exists in the autotest DB. ' 244 'Select it rather than entering it as a one time ' 245 'host.' % hostname 246 }) 247 host.protection = host_protections.Protection.DO_NOT_REPAIR 248 host.locked = False 249 host.save() 250 host.clean_object() 251 return host 252 253 254 def resurrect_object(self, old_object): 255 super(Host, self).resurrect_object(old_object) 256 # invalid hosts can be in use by the scheduler (as one-time hosts), so 257 # don't change the status 258 self.status = old_object.status 259 260 261 def clean_object(self): 262 self.aclgroup_set.clear() 263 self.labels.clear() 264 265 266 def save(self, *args, **kwargs): 267 # extra spaces in the hostname can be a sneaky source of errors 268 self.hostname = self.hostname.strip() 269 # is this a new object being saved for the first time? 270 first_time = (self.id is None) 271 if not first_time: 272 AclGroup.check_for_acl_violation_hosts([self]) 273 if self.locked and not self.locked_by: 274 self.locked_by = User.current_user() 275 self.lock_time = datetime.now() 276 self.dirty = True 277 elif not self.locked and self.locked_by: 278 self.locked_by = None 279 self.lock_time = None 280 super(Host, self).save(*args, **kwargs) 281 if first_time: 282 everyone = AclGroup.objects.get(name='Everyone') 283 everyone.hosts.add(self) 284 self._check_for_updated_attributes() 285 286 287 def delete(self): 288 AclGroup.check_for_acl_violation_hosts([self]) 289 for queue_entry in self.hostqueueentry_set.all(): 290 queue_entry.deleted = True 291 queue_entry.abort() 292 super(Host, self).delete() 293 294 295 def on_attribute_changed(self, attribute, old_value): 296 assert attribute == 'status' 297 logging.info(self.hostname + ' -> ' + self.status) 298 299 300 def enqueue_job(self, job, atomic_group=None, is_template=False): 301 """Enqueue a job on this host.""" 302 queue_entry = HostQueueEntry.create(host=self, job=job, 303 is_template=is_template, 304 atomic_group=atomic_group) 305 # allow recovery of dead hosts from the frontend 306 if not self.active_queue_entry() and self.is_dead(): 307 self.status = Host.Status.READY 308 self.save() 309 queue_entry.save() 310 311 block = IneligibleHostQueue(job=job, host=self) 312 block.save() 313 314 315 def platform(self): 316 # TODO(showard): slighly hacky? 317 platforms = self.labels.filter(platform=True) 318 if len(platforms) == 0: 319 return None 320 return platforms[0] 321 platform.short_description = 'Platform' 322 323 324 @classmethod 325 def check_no_platform(cls, hosts): 326 Host.objects.populate_relationships(hosts, Label, 'label_list') 327 errors = [] 328 for host in hosts: 329 platforms = [label.name for label in host.label_list 330 if label.platform] 331 if platforms: 332 # do a join, just in case this host has multiple platforms, 333 # we'll be able to see it 334 errors.append('Host %s already has a platform: %s' % ( 335 host.hostname, ', '.join(platforms))) 336 if errors: 337 raise model_logic.ValidationError({'labels': '; '.join(errors)}) 338 339 340 def is_dead(self): 341 return self.status == Host.Status.REPAIR_FAILED 342 343 344 def active_queue_entry(self): 345 active = list(self.hostqueueentry_set.filter(active=True)) 346 if not active: 347 return None 348 assert len(active) == 1, ('More than one active entry for ' 349 'host ' + self.hostname) 350 return active[0] 351 352 353 def _get_attribute_model_and_args(self, attribute): 354 return HostAttribute, dict(host=self, attribute=attribute) 355 356 357 class Meta: 358 db_table = 'afe_hosts' 359 360 def __unicode__(self): 361 return unicode(self.hostname) 362 363 364class HostAttribute(dbmodels.Model): 365 """Arbitrary keyvals associated with hosts.""" 366 host = dbmodels.ForeignKey(Host) 367 attribute = dbmodels.CharField(max_length=90) 368 value = dbmodels.CharField(max_length=300) 369 370 objects = model_logic.ExtendedManager() 371 372 class Meta: 373 db_table = 'afe_host_attributes' 374 375 376class Test(dbmodels.Model, model_logic.ModelExtensions): 377 """\ 378 Required: 379 author: author name 380 description: description of the test 381 name: test name 382 time: short, medium, long 383 test_class: This describes the class for your the test belongs in. 384 test_category: This describes the category for your tests 385 test_type: Client or Server 386 path: path to pass to run_test() 387 sync_count: is a number >=1 (1 being the default). If it's 1, then it's an 388 async job. If it's >1 it's sync job for that number of machines 389 i.e. if sync_count = 2 it is a sync job that requires two 390 machines. 391 Optional: 392 dependencies: What the test requires to run. Comma deliminated list 393 dependency_labels: many-to-many relationship with labels corresponding to 394 test dependencies. 395 experimental: If this is set to True production servers will ignore the test 396 run_verify: Whether or not the scheduler should run the verify stage 397 """ 398 TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1) 399 # TODO(showard) - this should be merged with Job.ControlType (but right 400 # now they use opposite values) 401 402 name = dbmodels.CharField(max_length=255, unique=True) 403 author = dbmodels.CharField(max_length=255) 404 test_class = dbmodels.CharField(max_length=255) 405 test_category = dbmodels.CharField(max_length=255) 406 dependencies = dbmodels.CharField(max_length=255, blank=True) 407 description = dbmodels.TextField(blank=True) 408 experimental = dbmodels.BooleanField(default=True) 409 run_verify = dbmodels.BooleanField(default=True) 410 test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(), 411 default=TestTime.MEDIUM) 412 test_type = dbmodels.SmallIntegerField( 413 choices=model_attributes.TestTypes.choices()) 414 sync_count = dbmodels.IntegerField(default=1) 415 path = dbmodels.CharField(max_length=255, unique=True) 416 417 dependency_labels = ( 418 dbmodels.ManyToManyField(Label, blank=True, 419 db_table='afe_autotests_dependency_labels')) 420 name_field = 'name' 421 objects = model_logic.ExtendedManager() 422 423 424 def admin_description(self): 425 escaped_description = saxutils.escape(self.description) 426 return '<span style="white-space:pre">%s</span>' % escaped_description 427 admin_description.allow_tags = True 428 admin_description.short_description = 'Description' 429 430 431 class Meta: 432 db_table = 'afe_autotests' 433 434 def __unicode__(self): 435 return unicode(self.name) 436 437 438class Profiler(dbmodels.Model, model_logic.ModelExtensions): 439 """\ 440 Required: 441 name: profiler name 442 test_type: Client or Server 443 444 Optional: 445 description: arbirary text description 446 """ 447 name = dbmodels.CharField(max_length=255, unique=True) 448 description = dbmodels.TextField(blank=True) 449 450 name_field = 'name' 451 objects = model_logic.ExtendedManager() 452 453 454 class Meta: 455 db_table = 'afe_profilers' 456 457 def __unicode__(self): 458 return unicode(self.name) 459 460 461class AclGroup(dbmodels.Model, model_logic.ModelExtensions): 462 """\ 463 Required: 464 name: name of ACL group 465 466 Optional: 467 description: arbitrary description of group 468 """ 469 name = dbmodels.CharField(max_length=255, unique=True) 470 description = dbmodels.CharField(max_length=255, blank=True) 471 users = dbmodels.ManyToManyField(User, blank=False, 472 db_table='afe_acl_groups_users') 473 hosts = dbmodels.ManyToManyField(Host, blank=True, 474 db_table='afe_acl_groups_hosts') 475 476 name_field = 'name' 477 objects = model_logic.ExtendedManager() 478 479 @staticmethod 480 def check_for_acl_violation_hosts(hosts): 481 user = User.current_user() 482 if user.is_superuser(): 483 return 484 accessible_host_ids = set( 485 host.id for host in Host.objects.filter(aclgroup__users=user)) 486 for host in hosts: 487 # Check if the user has access to this host, 488 # but only if it is not a metahost or a one-time-host 489 no_access = (isinstance(host, Host) 490 and not host.invalid 491 and int(host.id) not in accessible_host_ids) 492 if no_access: 493 raise AclAccessViolation("%s does not have access to %s" % 494 (str(user), str(host))) 495 496 497 @staticmethod 498 def check_abort_permissions(queue_entries): 499 """ 500 look for queue entries that aren't abortable, meaning 501 * the job isn't owned by this user, and 502 * the machine isn't ACL-accessible, or 503 * the machine is in the "Everyone" ACL 504 """ 505 user = User.current_user() 506 if user.is_superuser(): 507 return 508 not_owned = queue_entries.exclude(job__owner=user.login) 509 # I do this using ID sets instead of just Django filters because 510 # filtering on M2M dbmodels is broken in Django 0.96. It's better in 511 # 1.0. 512 # TODO: Use Django filters, now that we're using 1.0. 513 accessible_ids = set( 514 entry.id for entry 515 in not_owned.filter(host__aclgroup__users__login=user.login)) 516 public_ids = set(entry.id for entry 517 in not_owned.filter(host__aclgroup__name='Everyone')) 518 cannot_abort = [entry for entry in not_owned.select_related() 519 if entry.id not in accessible_ids 520 or entry.id in public_ids] 521 if len(cannot_abort) == 0: 522 return 523 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner, 524 entry.host_or_metahost_name()) 525 for entry in cannot_abort) 526 raise AclAccessViolation('You cannot abort the following job entries: ' 527 + entry_names) 528 529 530 def check_for_acl_violation_acl_group(self): 531 user = User.current_user() 532 if user.is_superuser(): 533 return 534 if self.name == 'Everyone': 535 raise AclAccessViolation("You cannot modify 'Everyone'!") 536 if not user in self.users.all(): 537 raise AclAccessViolation("You do not have access to %s" 538 % self.name) 539 540 @staticmethod 541 def on_host_membership_change(): 542 everyone = AclGroup.objects.get(name='Everyone') 543 544 # find hosts that aren't in any ACL group and add them to Everyone 545 # TODO(showard): this is a bit of a hack, since the fact that this query 546 # works is kind of a coincidence of Django internals. This trick 547 # doesn't work in general (on all foreign key relationships). I'll 548 # replace it with a better technique when the need arises. 549 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True) 550 everyone.hosts.add(*orphaned_hosts.distinct()) 551 552 # find hosts in both Everyone and another ACL group, and remove them 553 # from Everyone 554 hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone') 555 acled_hosts = set() 556 for host in hosts_in_everyone: 557 # Has an ACL group other than Everyone 558 if host.aclgroup_set.count() > 1: 559 acled_hosts.add(host) 560 everyone.hosts.remove(*acled_hosts) 561 562 563 def delete(self): 564 if (self.name == 'Everyone'): 565 raise AclAccessViolation("You cannot delete 'Everyone'!") 566 self.check_for_acl_violation_acl_group() 567 super(AclGroup, self).delete() 568 self.on_host_membership_change() 569 570 571 def add_current_user_if_empty(self): 572 if not self.users.count(): 573 self.users.add(User.current_user()) 574 575 576 def perform_after_save(self, change): 577 if not change: 578 self.users.add(User.current_user()) 579 self.add_current_user_if_empty() 580 self.on_host_membership_change() 581 582 583 def save(self, *args, **kwargs): 584 change = bool(self.id) 585 if change: 586 # Check the original object for an ACL violation 587 AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group() 588 super(AclGroup, self).save(*args, **kwargs) 589 self.perform_after_save(change) 590 591 592 class Meta: 593 db_table = 'afe_acl_groups' 594 595 def __unicode__(self): 596 return unicode(self.name) 597 598 599class JobManager(model_logic.ExtendedManager): 600 'Custom manager to provide efficient status counts querying.' 601 def get_status_counts(self, job_ids): 602 """\ 603 Returns a dictionary mapping the given job IDs to their status 604 count dictionaries. 605 """ 606 if not job_ids: 607 return {} 608 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids) 609 cursor = connection.cursor() 610 cursor.execute(""" 611 SELECT job_id, status, aborted, complete, COUNT(*) 612 FROM afe_host_queue_entries 613 WHERE job_id IN %s 614 GROUP BY job_id, status, aborted, complete 615 """ % id_list) 616 all_job_counts = dict((job_id, {}) for job_id in job_ids) 617 for job_id, status, aborted, complete, count in cursor.fetchall(): 618 job_dict = all_job_counts[job_id] 619 full_status = HostQueueEntry.compute_full_status(status, aborted, 620 complete) 621 job_dict.setdefault(full_status, 0) 622 job_dict[full_status] += count 623 return all_job_counts 624 625 626class Job(dbmodels.Model, model_logic.ModelExtensions): 627 """\ 628 owner: username of job owner 629 name: job name (does not have to be unique) 630 priority: Low, Medium, High, Urgent (or 0-3) 631 control_file: contents of control file 632 control_type: Client or Server 633 created_on: date of job creation 634 submitted_on: date of job submission 635 synch_count: how many hosts should be used per autoserv execution 636 run_verify: Whether or not to run the verify phase 637 timeout: hours from queuing time until job times out 638 max_runtime_hrs: hours from job starting time until job times out 639 email_list: list of people to email on completion delimited by any of: 640 white space, ',', ':', ';' 641 dependency_labels: many-to-many relationship with labels corresponding to 642 job dependencies 643 reboot_before: Never, If dirty, or Always 644 reboot_after: Never, If all tests passed, or Always 645 parse_failed_repair: if True, a failed repair launched by this job will have 646 its results parsed as part of the job. 647 """ 648 DEFAULT_TIMEOUT = global_config.global_config.get_config_value( 649 'AUTOTEST_WEB', 'job_timeout_default', default=240) 650 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value( 651 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72) 652 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value( 653 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, 654 default=False) 655 656 Priority = enum.Enum('Low', 'Medium', 'High', 'Urgent') 657 ControlType = enum.Enum('Server', 'Client', start_value=1) 658 659 owner = dbmodels.CharField(max_length=255) 660 name = dbmodels.CharField(max_length=255) 661 priority = dbmodels.SmallIntegerField(choices=Priority.choices(), 662 blank=True, # to allow 0 663 default=Priority.MEDIUM) 664 control_file = dbmodels.TextField() 665 control_type = dbmodels.SmallIntegerField(choices=ControlType.choices(), 666 blank=True, # to allow 0 667 default=ControlType.CLIENT) 668 created_on = dbmodels.DateTimeField() 669 synch_count = dbmodels.IntegerField(null=True, default=1) 670 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT) 671 run_verify = dbmodels.BooleanField(default=True) 672 email_list = dbmodels.CharField(max_length=250, blank=True) 673 dependency_labels = ( 674 dbmodels.ManyToManyField(Label, blank=True, 675 db_table='afe_jobs_dependency_labels')) 676 reboot_before = dbmodels.SmallIntegerField( 677 choices=model_attributes.RebootBefore.choices(), blank=True, 678 default=DEFAULT_REBOOT_BEFORE) 679 reboot_after = dbmodels.SmallIntegerField( 680 choices=model_attributes.RebootAfter.choices(), blank=True, 681 default=DEFAULT_REBOOT_AFTER) 682 parse_failed_repair = dbmodels.BooleanField( 683 default=DEFAULT_PARSE_FAILED_REPAIR) 684 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS) 685 686 687 # custom manager 688 objects = JobManager() 689 690 691 def is_server_job(self): 692 return self.control_type == self.ControlType.SERVER 693 694 695 @classmethod 696 def create(cls, owner, options, hosts): 697 """\ 698 Creates a job by taking some information (the listed args) 699 and filling in the rest of the necessary information. 700 """ 701 AclGroup.check_for_acl_violation_hosts(hosts) 702 703 user = User.current_user() 704 if options.get('reboot_before') is None: 705 options['reboot_before'] = user.get_reboot_before_display() 706 if options.get('reboot_after') is None: 707 options['reboot_after'] = user.get_reboot_after_display() 708 709 job = cls.add_object( 710 owner=owner, 711 name=options['name'], 712 priority=options['priority'], 713 control_file=options['control_file'], 714 control_type=options['control_type'], 715 synch_count=options.get('synch_count'), 716 timeout=options.get('timeout'), 717 max_runtime_hrs=options.get('max_runtime_hrs'), 718 run_verify=options.get('run_verify'), 719 email_list=options.get('email_list'), 720 reboot_before=options.get('reboot_before'), 721 reboot_after=options.get('reboot_after'), 722 parse_failed_repair=options.get('parse_failed_repair'), 723 created_on=datetime.now()) 724 725 job.dependency_labels = options['dependencies'] 726 727 if options['keyvals'] is not None: 728 for key, value in options['keyvals'].iteritems(): 729 JobKeyval.objects.create(job=job, key=key, value=value) 730 731 return job 732 733 734 def queue(self, hosts, atomic_group=None, is_template=False): 735 """Enqueue a job on the given hosts.""" 736 if not hosts: 737 if atomic_group: 738 # No hosts or labels are required to queue an atomic group 739 # Job. However, if they are given, we respect them below. 740 atomic_group.enqueue_job(self, is_template=is_template) 741 else: 742 # hostless job 743 entry = HostQueueEntry.create(job=self, is_template=is_template) 744 entry.save() 745 return 746 747 for host in hosts: 748 host.enqueue_job(self, atomic_group=atomic_group, 749 is_template=is_template) 750 751 752 def create_recurring_job(self, start_date, loop_period, loop_count, owner): 753 rec = RecurringRun(job=self, start_date=start_date, 754 loop_period=loop_period, 755 loop_count=loop_count, 756 owner=User.objects.get(login=owner)) 757 rec.save() 758 return rec.id 759 760 761 def user(self): 762 try: 763 return User.objects.get(login=self.owner) 764 except self.DoesNotExist: 765 return None 766 767 768 def abort(self): 769 for queue_entry in self.hostqueueentry_set.all(): 770 queue_entry.abort() 771 772 773 def tag(self): 774 return '%s-%s' % (self.id, self.owner) 775 776 777 def keyval_dict(self): 778 return dict((keyval.key, keyval.value) 779 for keyval in self.jobkeyval_set.all()) 780 781 782 class Meta: 783 db_table = 'afe_jobs' 784 785 def __unicode__(self): 786 return u'%s (%s-%s)' % (self.name, self.id, self.owner) 787 788 789class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): 790 """Keyvals associated with jobs""" 791 job = dbmodels.ForeignKey(Job) 792 key = dbmodels.CharField(max_length=90) 793 value = dbmodels.CharField(max_length=300) 794 795 objects = model_logic.ExtendedManager() 796 797 class Meta: 798 db_table = 'afe_job_keyvals' 799 800 801class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): 802 job = dbmodels.ForeignKey(Job) 803 host = dbmodels.ForeignKey(Host) 804 805 objects = model_logic.ExtendedManager() 806 807 class Meta: 808 db_table = 'afe_ineligible_host_queues' 809 810 811class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 812 Status = host_queue_entry_states.Status 813 ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES 814 COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES 815 816 job = dbmodels.ForeignKey(Job) 817 host = dbmodels.ForeignKey(Host, blank=True, null=True) 818 status = dbmodels.CharField(max_length=255) 819 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True, 820 db_column='meta_host') 821 active = dbmodels.BooleanField(default=False) 822 complete = dbmodels.BooleanField(default=False) 823 deleted = dbmodels.BooleanField(default=False) 824 execution_subdir = dbmodels.CharField(max_length=255, blank=True, 825 default='') 826 # If atomic_group is set, this is a virtual HostQueueEntry that will 827 # be expanded into many actual hosts within the group at schedule time. 828 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True) 829 aborted = dbmodels.BooleanField(default=False) 830 started_on = dbmodels.DateTimeField(null=True, blank=True) 831 832 objects = model_logic.ExtendedManager() 833 834 835 def __init__(self, *args, **kwargs): 836 super(HostQueueEntry, self).__init__(*args, **kwargs) 837 self._record_attributes(['status']) 838 839 840 @classmethod 841 def create(cls, job, host=None, meta_host=None, atomic_group=None, 842 is_template=False): 843 if is_template: 844 status = cls.Status.TEMPLATE 845 else: 846 status = cls.Status.QUEUED 847 848 return cls(job=job, host=host, meta_host=meta_host, 849 atomic_group=atomic_group, status=status) 850 851 852 def save(self, *args, **kwargs): 853 self._set_active_and_complete() 854 super(HostQueueEntry, self).save(*args, **kwargs) 855 self._check_for_updated_attributes() 856 857 858 def execution_path(self): 859 """ 860 Path to this entry's results (relative to the base results directory). 861 """ 862 return os.path.join(self.job.tag(), self.execution_subdir) 863 864 865 def host_or_metahost_name(self): 866 if self.host: 867 return self.host.hostname 868 elif self.meta_host: 869 return self.meta_host.name 870 else: 871 assert self.atomic_group, "no host, meta_host or atomic group!" 872 return self.atomic_group.name 873 874 875 def _set_active_and_complete(self): 876 if self.status in self.ACTIVE_STATUSES: 877 self.active, self.complete = True, False 878 elif self.status in self.COMPLETE_STATUSES: 879 self.active, self.complete = False, True 880 else: 881 self.active, self.complete = False, False 882 883 884 def on_attribute_changed(self, attribute, old_value): 885 assert attribute == 'status' 886 logging.info('%s/%d (%d) -> %s' % (self.host, self.job.id, self.id, 887 self.status)) 888 889 890 def is_meta_host_entry(self): 891 'True if this is a entry has a meta_host instead of a host.' 892 return self.host is None and self.meta_host is not None 893 894 895 def log_abort(self, user): 896 abort_log = AbortedHostQueueEntry(queue_entry=self, aborted_by=user) 897 abort_log.save() 898 899 900 def abort(self): 901 # this isn't completely immune to race conditions since it's not atomic, 902 # but it should be safe given the scheduler's behavior. 903 if not self.complete and not self.aborted: 904 self.log_abort(User.current_user()) 905 self.aborted = True 906 self.save() 907 908 909 @classmethod 910 def compute_full_status(cls, status, aborted, complete): 911 if aborted and not complete: 912 return 'Aborted (%s)' % status 913 return status 914 915 916 def full_status(self): 917 return self.compute_full_status(self.status, self.aborted, 918 self.complete) 919 920 921 def _postprocess_object_dict(self, object_dict): 922 object_dict['full_status'] = self.full_status() 923 924 925 class Meta: 926 db_table = 'afe_host_queue_entries' 927 928 929 930 def __unicode__(self): 931 hostname = None 932 if self.host: 933 hostname = self.host.hostname 934 return u"%s/%d (%d)" % (hostname, self.job.id, self.id) 935 936 937class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): 938 queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True) 939 aborted_by = dbmodels.ForeignKey(User) 940 aborted_on = dbmodels.DateTimeField() 941 942 objects = model_logic.ExtendedManager() 943 944 945 def save(self, *args, **kwargs): 946 self.aborted_on = datetime.now() 947 super(AbortedHostQueueEntry, self).save(*args, **kwargs) 948 949 class Meta: 950 db_table = 'afe_aborted_host_queue_entries' 951 952 953class RecurringRun(dbmodels.Model, model_logic.ModelExtensions): 954 """\ 955 job: job to use as a template 956 owner: owner of the instantiated template 957 start_date: Run the job at scheduled date 958 loop_period: Re-run (loop) the job periodically 959 (in every loop_period seconds) 960 loop_count: Re-run (loop) count 961 """ 962 963 job = dbmodels.ForeignKey(Job) 964 owner = dbmodels.ForeignKey(User) 965 start_date = dbmodels.DateTimeField() 966 loop_period = dbmodels.IntegerField(blank=True) 967 loop_count = dbmodels.IntegerField(blank=True) 968 969 objects = model_logic.ExtendedManager() 970 971 class Meta: 972 db_table = 'afe_recurring_run' 973 974 def __unicode__(self): 975 return u'RecurringRun(job %s, start %s, period %s, count %s)' % ( 976 self.job.id, self.start_date, self.loop_period, self.loop_count) 977 978 979class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): 980 """\ 981 Tasks to run on hosts at the next time they are in the Ready state. Use this 982 for high-priority tasks, such as forced repair or forced reinstall. 983 984 host: host to run this task on 985 task: special task to run 986 time_requested: date and time the request for this task was made 987 is_active: task is currently running 988 is_complete: task has finished running 989 time_started: date and time the task started 990 queue_entry: Host queue entry waiting on this task (or None, if task was not 991 started in preparation of a job) 992 """ 993 Task = enum.Enum('Verify', 'Cleanup', 'Repair', string_values=True) 994 995 host = dbmodels.ForeignKey(Host, blank=False, null=False) 996 task = dbmodels.CharField(max_length=64, choices=Task.choices(), 997 blank=False, null=False) 998 requested_by = dbmodels.ForeignKey(User, blank=True, null=True) 999 time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False, 1000 null=False) 1001 is_active = dbmodels.BooleanField(default=False, blank=False, null=False) 1002 is_complete = dbmodels.BooleanField(default=False, blank=False, null=False) 1003 time_started = dbmodels.DateTimeField(null=True, blank=True) 1004 queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True) 1005 success = dbmodels.BooleanField(default=False, blank=False, null=False) 1006 1007 objects = model_logic.ExtendedManager() 1008 1009 1010 def save(self, **kwargs): 1011 if self.queue_entry: 1012 self.requested_by = User.objects.get( 1013 login=self.queue_entry.job.owner) 1014 super(SpecialTask, self).save(**kwargs) 1015 1016 1017 def execution_path(self): 1018 """@see HostQueueEntry.execution_path()""" 1019 return 'hosts/%s/%s-%s' % (self.host.hostname, self.id, 1020 self.task.lower()) 1021 1022 1023 # property to emulate HostQueueEntry.status 1024 @property 1025 def status(self): 1026 """ 1027 Return a host queue entry status appropriate for this task. Although 1028 SpecialTasks are not HostQueueEntries, it is helpful to the user to 1029 present similar statuses. 1030 """ 1031 if self.is_complete: 1032 if self.success: 1033 return HostQueueEntry.Status.COMPLETED 1034 return HostQueueEntry.Status.FAILED 1035 if self.is_active: 1036 return HostQueueEntry.Status.RUNNING 1037 return HostQueueEntry.Status.QUEUED 1038 1039 1040 # property to emulate HostQueueEntry.started_on 1041 @property 1042 def started_on(self): 1043 return self.time_started 1044 1045 1046 @classmethod 1047 def schedule_special_task(cls, host, task): 1048 """ 1049 Schedules a special task on a host if the task is not already scheduled. 1050 """ 1051 existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task, 1052 is_active=False, 1053 is_complete=False) 1054 if existing_tasks: 1055 return existing_tasks[0] 1056 1057 special_task = SpecialTask(host=host, task=task, 1058 requested_by=User.current_user()) 1059 special_task.save() 1060 return special_task 1061 1062 1063 def activate(self): 1064 """ 1065 Sets a task as active and sets the time started to the current time. 1066 """ 1067 logging.info('Starting: %s', self) 1068 self.is_active = True 1069 self.time_started = datetime.now() 1070 self.save() 1071 1072 1073 def finish(self, success): 1074 """ 1075 Sets a task as completed 1076 """ 1077 logging.info('Finished: %s', self) 1078 self.is_active = False 1079 self.is_complete = True 1080 self.success = success 1081 self.save() 1082 1083 1084 class Meta: 1085 db_table = 'afe_special_tasks' 1086 1087 1088 def __unicode__(self): 1089 result = u'Special Task %s (host %s, task %s, time %s)' % ( 1090 self.id, self.host, self.task, self.time_requested) 1091 if self.is_complete: 1092 result += u' (completed)' 1093 elif self.is_active: 1094 result += u' (active)' 1095 1096 return result 1097