1# pylint: disable=missing-docstring
2
3import logging
4from datetime import datetime
5import django.core
6try:
7    from django.db import models as dbmodels, connection
8except django.core.exceptions.ImproperlyConfigured:
9    raise ImportError('Django database not yet configured. Import either '
10                       'setup_django_environment or '
11                       'setup_django_lite_environment from '
12                       'autotest_lib.frontend before any imports that '
13                       'depend on django models.')
14from xml.sax import saxutils
15import common
16from autotest_lib.frontend.afe import model_logic, model_attributes
17from autotest_lib.frontend.afe import rdb_model_extensions
18from autotest_lib.frontend import settings, thread_local
19from autotest_lib.client.common_lib import enum, error, host_protections
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.client.common_lib import host_queue_entry_states
22from autotest_lib.client.common_lib import control_data, priorities, decorators
23from autotest_lib.client.common_lib import utils
24from autotest_lib.server import utils as server_utils
25
26# job options and user preferences
27DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
28DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER
29
30
31class AclAccessViolation(Exception):
32    """\
33    Raised when an operation is attempted with proper permissions as
34    dictated by ACLs.
35    """
36
37
38class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model):
39    """\
40    An atomic group defines a collection of hosts which must only be scheduled
41    all at once.  Any host with a label having an atomic group will only be
42    scheduled for a job at the same time as other hosts sharing that label.
43
44    Required:
45      name: A name for this atomic group, e.g. 'rack23' or 'funky_net'.
46      max_number_of_machines: The maximum number of machines that will be
47              scheduled at once when scheduling jobs to this atomic group.
48              The job.synch_count is considered the minimum.
49
50    Optional:
51      description: Arbitrary text description of this group's purpose.
52    """
53    name = dbmodels.CharField(max_length=255, unique=True)
54    description = dbmodels.TextField(blank=True)
55    # This magic value is the default to simplify the scheduler logic.
56    # It must be "large".  The common use of atomic groups is to want all
57    # machines in the group to be used, limits on which subset used are
58    # often chosen via dependency labels.
59    # TODO(dennisjeffrey): Revisit this so we don't have to assume that
60    # "infinity" is around 3.3 million.
61    INFINITE_MACHINES = 333333333
62    max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES)
63    invalid = dbmodels.BooleanField(default=False,
64                                  editable=settings.FULL_ADMIN)
65
66    name_field = 'name'
67    objects = model_logic.ModelWithInvalidManager()
68    valid_objects = model_logic.ValidObjectsManager()
69
70
71    def enqueue_job(self, job, is_template=False):
72        """Enqueue a job on an associated atomic group of hosts.
73
74        @param job: A job to enqueue.
75        @param is_template: Whether the status should be "Template".
76        """
77        queue_entry = HostQueueEntry.create(atomic_group=self, job=job,
78                                            is_template=is_template)
79        queue_entry.save()
80
81
82    def clean_object(self):
83        self.label_set.clear()
84
85
86    class Meta:
87        """Metadata for class AtomicGroup."""
88        db_table = 'afe_atomic_groups'
89
90
91    def __unicode__(self):
92        return unicode(self.name)
93
94
95class Label(model_logic.ModelWithInvalid, dbmodels.Model):
96    """\
97    Required:
98      name: label name
99
100    Optional:
101      kernel_config: URL/path to kernel config for jobs run on this label.
102      platform: If True, this is a platform label (defaults to False).
103      only_if_needed: If True, a Host with this label can only be used if that
104              label is requested by the job/test (either as the meta_host or
105              in the job_dependencies).
106      atomic_group: The atomic group associated with this label.
107    """
108    name = dbmodels.CharField(max_length=255, unique=True)
109    kernel_config = dbmodels.CharField(max_length=255, blank=True)
110    platform = dbmodels.BooleanField(default=False)
111    invalid = dbmodels.BooleanField(default=False,
112                                    editable=settings.FULL_ADMIN)
113    only_if_needed = dbmodels.BooleanField(default=False)
114
115    name_field = 'name'
116    objects = model_logic.ModelWithInvalidManager()
117    valid_objects = model_logic.ValidObjectsManager()
118    atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True)
119
120
121    def clean_object(self):
122        self.host_set.clear()
123        self.test_set.clear()
124
125
126    def enqueue_job(self, job, is_template=False):
127        """Enqueue a job on any host of this label.
128
129        @param job: A job to enqueue.
130        @param is_template: Whether the status should be "Template".
131        """
132        queue_entry = HostQueueEntry.create(meta_host=self, job=job,
133                                            is_template=is_template)
134        queue_entry.save()
135
136
137
138    class Meta:
139        """Metadata for class Label."""
140        db_table = 'afe_labels'
141
142
143    def __unicode__(self):
144        return unicode(self.name)
145
146
147class Shard(dbmodels.Model, model_logic.ModelExtensions):
148
149    hostname = dbmodels.CharField(max_length=255, unique=True)
150
151    name_field = 'hostname'
152
153    labels = dbmodels.ManyToManyField(Label, blank=True,
154                                      db_table='afe_shards_labels')
155
156    class Meta:
157        """Metadata for class ParameterizedJob."""
158        db_table = 'afe_shards'
159
160
161    def rpc_hostname(self):
162        """Get the rpc hostname of the shard.
163
164        @return: Just the shard hostname for all non-testing environments.
165                 The address of the default gateway for vm testing environments.
166        """
167        # TODO: Figure out a better solution for testing. Since no 2 shards
168        # can run on the same host, if the shard hostname is localhost we
169        # conclude that it must be a vm in a test cluster. In such situations
170        # a name of localhost:<port> is necessary to achieve the correct
171        # afe links/redirection from the frontend (this happens through the
172        # host), but for rpcs that are performed *on* the shard, they need to
173        # use the address of the gateway.
174        # In the virtual machine testing environment (i.e., puppylab), each
175        # shard VM has a hostname like localhost:<port>. In the real cluster
176        # environment, a shard node does not have 'localhost' for its hostname.
177        # The following hostname substitution is needed only for the VM
178        # in puppylab.
179        # The 'hostname' should not be replaced in the case of real cluster.
180        if utils.is_puppylab_vm(self.hostname):
181            hostname = self.hostname.split(':')[0]
182            return self.hostname.replace(
183                    hostname, utils.DEFAULT_VM_GATEWAY)
184        return self.hostname
185
186
187class Drone(dbmodels.Model, model_logic.ModelExtensions):
188    """
189    A scheduler drone
190
191    hostname: the drone's hostname
192    """
193    hostname = dbmodels.CharField(max_length=255, unique=True)
194
195    name_field = 'hostname'
196    objects = model_logic.ExtendedManager()
197
198
199    def save(self, *args, **kwargs):
200        if not User.current_user().is_superuser():
201            raise Exception('Only superusers may edit drones')
202        super(Drone, self).save(*args, **kwargs)
203
204
205    def delete(self):
206        if not User.current_user().is_superuser():
207            raise Exception('Only superusers may delete drones')
208        super(Drone, self).delete()
209
210
211    class Meta:
212        """Metadata for class Drone."""
213        db_table = 'afe_drones'
214
215    def __unicode__(self):
216        return unicode(self.hostname)
217
218
219class DroneSet(dbmodels.Model, model_logic.ModelExtensions):
220    """
221    A set of scheduler drones
222
223    These will be used by the scheduler to decide what drones a job is allowed
224    to run on.
225
226    name: the drone set's name
227    drones: the drones that are part of the set
228    """
229    DRONE_SETS_ENABLED = global_config.global_config.get_config_value(
230            'SCHEDULER', 'drone_sets_enabled', type=bool, default=False)
231    DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value(
232            'SCHEDULER', 'default_drone_set_name', default=None)
233
234    name = dbmodels.CharField(max_length=255, unique=True)
235    drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones')
236
237    name_field = 'name'
238    objects = model_logic.ExtendedManager()
239
240
241    def save(self, *args, **kwargs):
242        if not User.current_user().is_superuser():
243            raise Exception('Only superusers may edit drone sets')
244        super(DroneSet, self).save(*args, **kwargs)
245
246
247    def delete(self):
248        if not User.current_user().is_superuser():
249            raise Exception('Only superusers may delete drone sets')
250        super(DroneSet, self).delete()
251
252
253    @classmethod
254    def drone_sets_enabled(cls):
255        """Returns whether drone sets are enabled.
256
257        @param cls: Implicit class object.
258        """
259        return cls.DRONE_SETS_ENABLED
260
261
262    @classmethod
263    def default_drone_set_name(cls):
264        """Returns the default drone set name.
265
266        @param cls: Implicit class object.
267        """
268        return cls.DEFAULT_DRONE_SET_NAME
269
270
271    @classmethod
272    def get_default(cls):
273        """Gets the default drone set name, compatible with Job.add_object.
274
275        @param cls: Implicit class object.
276        """
277        return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME)
278
279
280    @classmethod
281    def resolve_name(cls, drone_set_name):
282        """
283        Returns the name of one of these, if not None, in order of preference:
284        1) the drone set given,
285        2) the current user's default drone set, or
286        3) the global default drone set
287
288        or returns None if drone sets are disabled
289
290        @param cls: Implicit class object.
291        @param drone_set_name: A drone set name.
292        """
293        if not cls.drone_sets_enabled():
294            return None
295
296        user = User.current_user()
297        user_drone_set_name = user.drone_set and user.drone_set.name
298
299        return drone_set_name or user_drone_set_name or cls.get_default().name
300
301
302    def get_drone_hostnames(self):
303        """
304        Gets the hostnames of all drones in this drone set
305        """
306        return set(self.drones.all().values_list('hostname', flat=True))
307
308
309    class Meta:
310        """Metadata for class DroneSet."""
311        db_table = 'afe_drone_sets'
312
313    def __unicode__(self):
314        return unicode(self.name)
315
316
317class User(dbmodels.Model, model_logic.ModelExtensions):
318    """\
319    Required:
320    login :user login name
321
322    Optional:
323    access_level: 0=User (default), 1=Admin, 100=Root
324    """
325    ACCESS_ROOT = 100
326    ACCESS_ADMIN = 1
327    ACCESS_USER = 0
328
329    AUTOTEST_SYSTEM = 'autotest_system'
330
331    login = dbmodels.CharField(max_length=255, unique=True)
332    access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True)
333
334    # user preferences
335    reboot_before = dbmodels.SmallIntegerField(
336        choices=model_attributes.RebootBefore.choices(), blank=True,
337        default=DEFAULT_REBOOT_BEFORE)
338    reboot_after = dbmodels.SmallIntegerField(
339        choices=model_attributes.RebootAfter.choices(), blank=True,
340        default=DEFAULT_REBOOT_AFTER)
341    drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
342    show_experimental = dbmodels.BooleanField(default=False)
343
344    name_field = 'login'
345    objects = model_logic.ExtendedManager()
346
347
348    def save(self, *args, **kwargs):
349        # is this a new object being saved for the first time?
350        first_time = (self.id is None)
351        user = thread_local.get_user()
352        if user and not user.is_superuser() and user.login != self.login:
353            raise AclAccessViolation("You cannot modify user " + self.login)
354        super(User, self).save(*args, **kwargs)
355        if first_time:
356            everyone = AclGroup.objects.get(name='Everyone')
357            everyone.users.add(self)
358
359
360    def is_superuser(self):
361        """Returns whether the user has superuser access."""
362        return self.access_level >= self.ACCESS_ROOT
363
364
365    @classmethod
366    def current_user(cls):
367        """Returns the current user.
368
369        @param cls: Implicit class object.
370        """
371        user = thread_local.get_user()
372        if user is None:
373            user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM)
374            user.access_level = cls.ACCESS_ROOT
375            user.save()
376        return user
377
378
379    @classmethod
380    def get_record(cls, data):
381        """Check the database for an identical record.
382
383        Check for a record with matching id and login. If one exists,
384        return it. If one does not exist there is a possibility that
385        the following cases have happened:
386        1. Same id, different login
387            We received: "1 chromeos-test"
388            And we have: "1 debug-user"
389        In this case we need to delete "1 debug_user" and insert
390        "1 chromeos-test".
391
392        2. Same login, different id:
393            We received: "1 chromeos-test"
394            And we have: "2 chromeos-test"
395        In this case we need to delete "2 chromeos-test" and insert
396        "1 chromeos-test".
397
398        As long as this method deletes bad records and raises the
399        DoesNotExist exception the caller will handle creating the
400        new record.
401
402        @raises: DoesNotExist, if a record with the matching login and id
403                does not exist.
404        """
405
406        # Both the id and login should be uniqe but there are cases when
407        # we might already have a user with the same login/id because
408        # current_user will proactively create a user record if it doesn't
409        # exist. Since we want to avoid conflict between the master and
410        # shard, just delete any existing user records that don't match
411        # what we're about to deserialize from the master.
412        try:
413            return cls.objects.get(login=data['login'], id=data['id'])
414        except cls.DoesNotExist:
415            cls.delete_matching_record(login=data['login'])
416            cls.delete_matching_record(id=data['id'])
417            raise
418
419
420    class Meta:
421        """Metadata for class User."""
422        db_table = 'afe_users'
423
424    def __unicode__(self):
425        return unicode(self.login)
426
427
428class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel,
429           model_logic.ModelWithAttributes):
430    """\
431    Required:
432    hostname
433
434    optional:
435    locked: if true, host is locked and will not be queued
436
437    Internal:
438    From AbstractHostModel:
439        status: string describing status of host
440        invalid: true if the host has been deleted
441        protection: indicates what can be done to this host during repair
442        lock_time: DateTime at which the host was locked
443        dirty: true if the host has been used without being rebooted
444    Local:
445        locked_by: user that locked the host, or null if the host is unlocked
446    """
447
448    SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set',
449                                         'hostattribute_set',
450                                         'labels',
451                                         'shard'])
452    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid'])
453
454
455    def custom_deserialize_relation(self, link, data):
456        assert link == 'shard', 'Link %s should not be deserialized' % link
457        self.shard = Shard.deserialize(data)
458
459
460    # Note: Only specify foreign keys here, specify all native host columns in
461    # rdb_model_extensions instead.
462    Protection = host_protections.Protection
463    labels = dbmodels.ManyToManyField(Label, blank=True,
464                                      db_table='afe_hosts_labels')
465    locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False)
466    name_field = 'hostname'
467    objects = model_logic.ModelWithInvalidManager()
468    valid_objects = model_logic.ValidObjectsManager()
469    leased_objects = model_logic.LeasedHostManager()
470
471    shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
472
473    def __init__(self, *args, **kwargs):
474        super(Host, self).__init__(*args, **kwargs)
475        self._record_attributes(['status'])
476
477
478    @staticmethod
479    def create_one_time_host(hostname):
480        """Creates a one-time host.
481
482        @param hostname: The name for the host.
483        """
484        query = Host.objects.filter(hostname=hostname)
485        if query.count() == 0:
486            host = Host(hostname=hostname, invalid=True)
487            host.do_validate()
488        else:
489            host = query[0]
490            if not host.invalid:
491                raise model_logic.ValidationError({
492                    'hostname' : '%s already exists in the autotest DB.  '
493                        'Select it rather than entering it as a one time '
494                        'host.' % hostname
495                    })
496        host.protection = host_protections.Protection.DO_NOT_REPAIR
497        host.locked = False
498        host.save()
499        host.clean_object()
500        return host
501
502
503    @classmethod
504    def _assign_to_shard_nothing_helper(cls):
505        """Does nothing.
506
507        This method is called in the middle of assign_to_shard, and does
508        nothing. It exists to allow integration tests to simulate a race
509        condition."""
510
511
512    @classmethod
513    def assign_to_shard(cls, shard, known_ids):
514        """Assigns hosts to a shard.
515
516        For all labels that have been assigned to a shard, all hosts that
517        have at least one of the shard's labels are assigned to the shard.
518        Hosts that are assigned to the shard but aren't already present on the
519        shard are returned.
520
521        Any boards that are in |known_ids| but that do not belong to the shard
522        are incorrect ids, which are also returned so that the shard can remove
523        them locally.
524
525        Board to shard mapping is many-to-one. Many different boards can be
526        hosted in a shard. However, DUTs of a single board cannot be distributed
527        into more than one shard.
528
529        @param shard: The shard object to assign labels/hosts for.
530        @param known_ids: List of all host-ids the shard already knows.
531                          This is used to figure out which hosts should be sent
532                          to the shard. If shard_ids were used instead, hosts
533                          would only be transferred once, even if the client
534                          failed persisting them.
535                          The number of hosts usually lies in O(100), so the
536                          overhead is acceptable.
537
538        @returns a tuple of (hosts objects that should be sent to the shard,
539                             incorrect host ids that should not belong to]
540                             shard)
541        """
542        # Disclaimer: concurrent heartbeats should theoretically not occur in
543        # the current setup. As they may be introduced in the near future,
544        # this comment will be left here.
545
546        # Sending stuff twice is acceptable, but forgetting something isn't.
547        # Detecting duplicates on the client is easy, but here it's harder. The
548        # following options were considered:
549        # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more
550        #   than select returned, as concurrently more hosts might have been
551        #   inserted
552        # - UPDATE and then SELECT WHERE shard=shard: select always returns all
553        #   hosts for the shard, this is overhead
554        # - SELECT and then UPDATE only selected without requerying afterwards:
555        #   returns the old state of the records.
556        new_hosts = []
557
558        possible_new_host_ids = set(Host.objects.filter(
559            labels__in=shard.labels.all(),
560            leased=False
561            ).exclude(
562            id__in=known_ids,
563            ).values_list('pk', flat=True))
564
565        # No-op in production, used to simulate race condition in tests.
566        cls._assign_to_shard_nothing_helper()
567
568        if possible_new_host_ids:
569            Host.objects.filter(
570                pk__in=possible_new_host_ids,
571                labels__in=shard.labels.all(),
572                leased=False
573                ).update(shard=shard)
574            new_hosts = list(Host.objects.filter(
575                pk__in=possible_new_host_ids,
576                shard=shard
577                ).all())
578
579        invalid_host_ids = list(Host.objects.filter(
580            id__in=known_ids
581            ).exclude(
582            shard=shard
583            ).values_list('pk', flat=True))
584
585        return new_hosts, invalid_host_ids
586
587    def resurrect_object(self, old_object):
588        super(Host, self).resurrect_object(old_object)
589        # invalid hosts can be in use by the scheduler (as one-time hosts), so
590        # don't change the status
591        self.status = old_object.status
592
593
594    def clean_object(self):
595        self.aclgroup_set.clear()
596        self.labels.clear()
597
598
599    def save(self, *args, **kwargs):
600        # extra spaces in the hostname can be a sneaky source of errors
601        self.hostname = self.hostname.strip()
602        # is this a new object being saved for the first time?
603        first_time = (self.id is None)
604        if not first_time:
605            AclGroup.check_for_acl_violation_hosts([self])
606        # If locked is changed, send its status and user made the change to
607        # metaDB. Locks are important in host history because if a device is
608        # locked then we don't really care what state it is in.
609        if self.locked and not self.locked_by:
610            self.locked_by = User.current_user()
611            if not self.lock_time:
612                self.lock_time = datetime.now()
613            self.dirty = True
614        elif not self.locked and self.locked_by:
615            self.locked_by = None
616            self.lock_time = None
617        super(Host, self).save(*args, **kwargs)
618        if first_time:
619            everyone = AclGroup.objects.get(name='Everyone')
620            everyone.hosts.add(self)
621        self._check_for_updated_attributes()
622
623
624    def delete(self):
625        AclGroup.check_for_acl_violation_hosts([self])
626        for queue_entry in self.hostqueueentry_set.all():
627            queue_entry.deleted = True
628            queue_entry.abort()
629        super(Host, self).delete()
630
631
632    def on_attribute_changed(self, attribute, old_value):
633        assert attribute == 'status'
634        logging.info(self.hostname + ' -> ' + self.status)
635
636
637    def enqueue_job(self, job, is_template=False):
638        """Enqueue a job on this host.
639
640        @param job: A job to enqueue.
641        @param is_template: Whther the status should be "Template".
642        """
643        queue_entry = HostQueueEntry.create(host=self, job=job,
644                                            is_template=is_template)
645        # allow recovery of dead hosts from the frontend
646        if not self.active_queue_entry() and self.is_dead():
647            self.status = Host.Status.READY
648            self.save()
649        queue_entry.save()
650
651        block = IneligibleHostQueue(job=job, host=self)
652        block.save()
653
654
655    def platform(self):
656        """The platform of the host."""
657        # TODO(showard): slighly hacky?
658        platforms = self.labels.filter(platform=True)
659        if len(platforms) == 0:
660            return None
661        return platforms[0]
662    platform.short_description = 'Platform'
663
664
665    @classmethod
666    def check_no_platform(cls, hosts):
667        """Verify the specified hosts have no associated platforms.
668
669        @param cls: Implicit class object.
670        @param hosts: The hosts to verify.
671        @raises model_logic.ValidationError if any hosts already have a
672            platform.
673        """
674        Host.objects.populate_relationships(hosts, Label, 'label_list')
675        errors = []
676        for host in hosts:
677            platforms = [label.name for label in host.label_list
678                         if label.platform]
679            if platforms:
680                # do a join, just in case this host has multiple platforms,
681                # we'll be able to see it
682                errors.append('Host %s already has a platform: %s' % (
683                              host.hostname, ', '.join(platforms)))
684        if errors:
685            raise model_logic.ValidationError({'labels': '; '.join(errors)})
686
687
688    @classmethod
689    def check_board_labels_allowed(cls, hosts, new_labels=[]):
690        """Verify the specified hosts have valid board labels and the given
691        new board labels can be added.
692
693        @param cls: Implicit class object.
694        @param hosts: The hosts to verify.
695        @param new_labels: A list of labels to be added to the hosts.
696
697        @raises model_logic.ValidationError if any host has invalid board labels
698                or the given board labels cannot be added to the hsots.
699        """
700        Host.objects.populate_relationships(hosts, Label, 'label_list')
701        errors = []
702        for host in hosts:
703            boards = [label.name for label in host.label_list
704                      if label.name.startswith('board:')]
705            if not server_utils.board_labels_allowed(boards + new_labels):
706                # do a join, just in case this host has multiple boards,
707                # we'll be able to see it
708                errors.append('Host %s already has board labels: %s' % (
709                              host.hostname, ', '.join(boards)))
710        if errors:
711            raise model_logic.ValidationError({'labels': '; '.join(errors)})
712
713
714    def is_dead(self):
715        """Returns whether the host is dead (has status repair failed)."""
716        return self.status == Host.Status.REPAIR_FAILED
717
718
719    def active_queue_entry(self):
720        """Returns the active queue entry for this host, or None if none."""
721        active = list(self.hostqueueentry_set.filter(active=True))
722        if not active:
723            return None
724        assert len(active) == 1, ('More than one active entry for '
725                                  'host ' + self.hostname)
726        return active[0]
727
728
729    def _get_attribute_model_and_args(self, attribute):
730        return HostAttribute, dict(host=self, attribute=attribute)
731
732
733    @classmethod
734    def get_attribute_model(cls):
735        """Return the attribute model.
736
737        Override method in parent class. See ModelExtensions for details.
738        @returns: The attribute model of Host.
739        """
740        return HostAttribute
741
742
743    class Meta:
744        """Metadata for the Host class."""
745        db_table = 'afe_hosts'
746
747
748    def __unicode__(self):
749        return unicode(self.hostname)
750
751
752class HostAttribute(dbmodels.Model, model_logic.ModelExtensions):
753    """Arbitrary keyvals associated with hosts."""
754
755    SERIALIZATION_LINKS_TO_KEEP = set(['host'])
756    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
757    host = dbmodels.ForeignKey(Host)
758    attribute = dbmodels.CharField(max_length=90)
759    value = dbmodels.CharField(max_length=300)
760
761    objects = model_logic.ExtendedManager()
762
763    class Meta:
764        """Metadata for the HostAttribute class."""
765        db_table = 'afe_host_attributes'
766
767
768    @classmethod
769    def get_record(cls, data):
770        """Check the database for an identical record.
771
772        Use host_id and attribute to search for a existing record.
773
774        @raises: DoesNotExist, if no record found
775        @raises: MultipleObjectsReturned if multiple records found.
776        """
777        # TODO(fdeng): We should use host_id and attribute together as
778        #              a primary key in the db.
779        return cls.objects.get(host_id=data['host_id'],
780                               attribute=data['attribute'])
781
782
783    @classmethod
784    def deserialize(cls, data):
785        """Override deserialize in parent class.
786
787        Do not deserialize id as id is not kept consistent on master and shards.
788
789        @param data: A dictionary of data to deserialize.
790
791        @returns: A HostAttribute object.
792        """
793        if data:
794            data.pop('id')
795        return super(HostAttribute, cls).deserialize(data)
796
797
798class Test(dbmodels.Model, model_logic.ModelExtensions):
799    """\
800    Required:
801    author: author name
802    description: description of the test
803    name: test name
804    time: short, medium, long
805    test_class: This describes the class for your the test belongs in.
806    test_category: This describes the category for your tests
807    test_type: Client or Server
808    path: path to pass to run_test()
809    sync_count:  is a number >=1 (1 being the default). If it's 1, then it's an
810                 async job. If it's >1 it's sync job for that number of machines
811                 i.e. if sync_count = 2 it is a sync job that requires two
812                 machines.
813    Optional:
814    dependencies: What the test requires to run. Comma deliminated list
815    dependency_labels: many-to-many relationship with labels corresponding to
816                       test dependencies.
817    experimental: If this is set to True production servers will ignore the test
818    run_verify: Whether or not the scheduler should run the verify stage
819    run_reset: Whether or not the scheduler should run the reset stage
820    test_retry: Number of times to retry test if the test did not complete
821                successfully. (optional, default: 0)
822    """
823    TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1)
824
825    name = dbmodels.CharField(max_length=255, unique=True)
826    author = dbmodels.CharField(max_length=255)
827    test_class = dbmodels.CharField(max_length=255)
828    test_category = dbmodels.CharField(max_length=255)
829    dependencies = dbmodels.CharField(max_length=255, blank=True)
830    description = dbmodels.TextField(blank=True)
831    experimental = dbmodels.BooleanField(default=True)
832    run_verify = dbmodels.BooleanField(default=False)
833    test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(),
834                                           default=TestTime.MEDIUM)
835    test_type = dbmodels.SmallIntegerField(
836        choices=control_data.CONTROL_TYPE.choices())
837    sync_count = dbmodels.IntegerField(default=1)
838    path = dbmodels.CharField(max_length=255, unique=True)
839    test_retry = dbmodels.IntegerField(blank=True, default=0)
840    run_reset = dbmodels.BooleanField(default=True)
841
842    dependency_labels = (
843        dbmodels.ManyToManyField(Label, blank=True,
844                                 db_table='afe_autotests_dependency_labels'))
845    name_field = 'name'
846    objects = model_logic.ExtendedManager()
847
848
849    def admin_description(self):
850        """Returns a string representing the admin description."""
851        escaped_description = saxutils.escape(self.description)
852        return '<span style="white-space:pre">%s</span>' % escaped_description
853    admin_description.allow_tags = True
854    admin_description.short_description = 'Description'
855
856
857    class Meta:
858        """Metadata for class Test."""
859        db_table = 'afe_autotests'
860
861    def __unicode__(self):
862        return unicode(self.name)
863
864
865class TestParameter(dbmodels.Model):
866    """
867    A declared parameter of a test
868    """
869    test = dbmodels.ForeignKey(Test)
870    name = dbmodels.CharField(max_length=255)
871
872    class Meta:
873        """Metadata for class TestParameter."""
874        db_table = 'afe_test_parameters'
875        unique_together = ('test', 'name')
876
877    def __unicode__(self):
878        return u'%s (%s)' % (self.name, self.test.name)
879
880
881class Profiler(dbmodels.Model, model_logic.ModelExtensions):
882    """\
883    Required:
884    name: profiler name
885    test_type: Client or Server
886
887    Optional:
888    description: arbirary text description
889    """
890    name = dbmodels.CharField(max_length=255, unique=True)
891    description = dbmodels.TextField(blank=True)
892
893    name_field = 'name'
894    objects = model_logic.ExtendedManager()
895
896
897    class Meta:
898        """Metadata for class Profiler."""
899        db_table = 'afe_profilers'
900
901    def __unicode__(self):
902        return unicode(self.name)
903
904
905class AclGroup(dbmodels.Model, model_logic.ModelExtensions):
906    """\
907    Required:
908    name: name of ACL group
909
910    Optional:
911    description: arbitrary description of group
912    """
913
914    SERIALIZATION_LINKS_TO_FOLLOW = set(['users'])
915
916    name = dbmodels.CharField(max_length=255, unique=True)
917    description = dbmodels.CharField(max_length=255, blank=True)
918    users = dbmodels.ManyToManyField(User, blank=False,
919                                     db_table='afe_acl_groups_users')
920    hosts = dbmodels.ManyToManyField(Host, blank=True,
921                                     db_table='afe_acl_groups_hosts')
922
923    name_field = 'name'
924    objects = model_logic.ExtendedManager()
925
926    @staticmethod
927    def check_for_acl_violation_hosts(hosts):
928        """Verify the current user has access to the specified hosts.
929
930        @param hosts: The hosts to verify against.
931        @raises AclAccessViolation if the current user doesn't have access
932            to a host.
933        """
934        user = User.current_user()
935        if user.is_superuser():
936            return
937        accessible_host_ids = set(
938            host.id for host in Host.objects.filter(aclgroup__users=user))
939        for host in hosts:
940            # Check if the user has access to this host,
941            # but only if it is not a metahost or a one-time-host.
942            no_access = (isinstance(host, Host)
943                         and not host.invalid
944                         and int(host.id) not in accessible_host_ids)
945            if no_access:
946                raise AclAccessViolation("%s does not have access to %s" %
947                                         (str(user), str(host)))
948
949
950    @staticmethod
951    def check_abort_permissions(queue_entries):
952        """Look for queue entries that aren't abortable by the current user.
953
954        An entry is not abortable if:
955           * the job isn't owned by this user, and
956           * the machine isn't ACL-accessible, or
957           * the machine is in the "Everyone" ACL
958
959        @param queue_entries: The queue entries to check.
960        @raises AclAccessViolation if a queue entry is not abortable by the
961            current user.
962        """
963        user = User.current_user()
964        if user.is_superuser():
965            return
966        not_owned = queue_entries.exclude(job__owner=user.login)
967        # I do this using ID sets instead of just Django filters because
968        # filtering on M2M dbmodels is broken in Django 0.96. It's better in
969        # 1.0.
970        # TODO: Use Django filters, now that we're using 1.0.
971        accessible_ids = set(
972            entry.id for entry
973            in not_owned.filter(host__aclgroup__users__login=user.login))
974        public_ids = set(entry.id for entry
975                         in not_owned.filter(host__aclgroup__name='Everyone'))
976        cannot_abort = [entry for entry in not_owned.select_related()
977                        if entry.id not in accessible_ids
978                        or entry.id in public_ids]
979        if len(cannot_abort) == 0:
980            return
981        entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner,
982                                              entry.host_or_metahost_name())
983                                for entry in cannot_abort)
984        raise AclAccessViolation('You cannot abort the following job entries: '
985                                 + entry_names)
986
987
988    def check_for_acl_violation_acl_group(self):
989        """Verifies the current user has acces to this ACL group.
990
991        @raises AclAccessViolation if the current user doesn't have access to
992            this ACL group.
993        """
994        user = User.current_user()
995        if user.is_superuser():
996            return
997        if self.name == 'Everyone':
998            raise AclAccessViolation("You cannot modify 'Everyone'!")
999        if not user in self.users.all():
1000            raise AclAccessViolation("You do not have access to %s"
1001                                     % self.name)
1002
1003    @staticmethod
1004    def on_host_membership_change():
1005        """Invoked when host membership changes."""
1006        everyone = AclGroup.objects.get(name='Everyone')
1007
1008        # find hosts that aren't in any ACL group and add them to Everyone
1009        # TODO(showard): this is a bit of a hack, since the fact that this query
1010        # works is kind of a coincidence of Django internals.  This trick
1011        # doesn't work in general (on all foreign key relationships).  I'll
1012        # replace it with a better technique when the need arises.
1013        orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True)
1014        everyone.hosts.add(*orphaned_hosts.distinct())
1015
1016        # find hosts in both Everyone and another ACL group, and remove them
1017        # from Everyone
1018        hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone')
1019        acled_hosts = set()
1020        for host in hosts_in_everyone:
1021            # Has an ACL group other than Everyone
1022            if host.aclgroup_set.count() > 1:
1023                acled_hosts.add(host)
1024        everyone.hosts.remove(*acled_hosts)
1025
1026
1027    def delete(self):
1028        if (self.name == 'Everyone'):
1029            raise AclAccessViolation("You cannot delete 'Everyone'!")
1030        self.check_for_acl_violation_acl_group()
1031        super(AclGroup, self).delete()
1032        self.on_host_membership_change()
1033
1034
1035    def add_current_user_if_empty(self):
1036        """Adds the current user if the set of users is empty."""
1037        if not self.users.count():
1038            self.users.add(User.current_user())
1039
1040
1041    def perform_after_save(self, change):
1042        """Called after a save.
1043
1044        @param change: Whether there was a change.
1045        """
1046        if not change:
1047            self.users.add(User.current_user())
1048        self.add_current_user_if_empty()
1049        self.on_host_membership_change()
1050
1051
1052    def save(self, *args, **kwargs):
1053        change = bool(self.id)
1054        if change:
1055            # Check the original object for an ACL violation
1056            AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group()
1057        super(AclGroup, self).save(*args, **kwargs)
1058        self.perform_after_save(change)
1059
1060
1061    class Meta:
1062        """Metadata for class AclGroup."""
1063        db_table = 'afe_acl_groups'
1064
1065    def __unicode__(self):
1066        return unicode(self.name)
1067
1068
1069class ParameterizedJob(dbmodels.Model):
1070    """
1071    Auxiliary configuration for a parameterized job.
1072
1073    This class is obsolete, and ought to be dead.  Due to a series of
1074    unfortunate events, it can't be deleted:
1075      * In `class Job` we're required to keep a reference to this class
1076        for the sake of the scheduler unit tests.
1077      * The existence of the reference in `Job` means that certain
1078        methods here will get called from the `get_jobs` RPC.
1079    So, the definitions below seem to be the minimum stub we can support
1080    unless/until we change the database schema.
1081    """
1082
1083    @classmethod
1084    def smart_get(cls, id_or_name, *args, **kwargs):
1085        """For compatibility with Job.add_object.
1086
1087        @param cls: Implicit class object.
1088        @param id_or_name: The ID or name to get.
1089        @param args: Non-keyword arguments.
1090        @param kwargs: Keyword arguments.
1091        """
1092        return cls.objects.get(pk=id_or_name)
1093
1094
1095    def job(self):
1096        """Returns the job if it exists, or else None."""
1097        jobs = self.job_set.all()
1098        assert jobs.count() <= 1
1099        return jobs and jobs[0] or None
1100
1101
1102    class Meta:
1103        """Metadata for class ParameterizedJob."""
1104        db_table = 'afe_parameterized_jobs'
1105
1106    def __unicode__(self):
1107        return u'%s (parameterized) - %s' % (self.test.name, self.job())
1108
1109
1110class JobManager(model_logic.ExtendedManager):
1111    'Custom manager to provide efficient status counts querying.'
1112    def get_status_counts(self, job_ids):
1113        """Returns a dict mapping the given job IDs to their status count dicts.
1114
1115        @param job_ids: A list of job IDs.
1116        """
1117        if not job_ids:
1118            return {}
1119        id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)
1120        cursor = connection.cursor()
1121        cursor.execute("""
1122            SELECT job_id, status, aborted, complete, COUNT(*)
1123            FROM afe_host_queue_entries
1124            WHERE job_id IN %s
1125            GROUP BY job_id, status, aborted, complete
1126            """ % id_list)
1127        all_job_counts = dict((job_id, {}) for job_id in job_ids)
1128        for job_id, status, aborted, complete, count in cursor.fetchall():
1129            job_dict = all_job_counts[job_id]
1130            full_status = HostQueueEntry.compute_full_status(status, aborted,
1131                                                             complete)
1132            job_dict.setdefault(full_status, 0)
1133            job_dict[full_status] += count
1134        return all_job_counts
1135
1136
1137class Job(dbmodels.Model, model_logic.ModelExtensions):
1138    """\
1139    owner: username of job owner
1140    name: job name (does not have to be unique)
1141    priority: Integer priority value.  Higher is more important.
1142    control_file: contents of control file
1143    control_type: Client or Server
1144    created_on: date of job creation
1145    submitted_on: date of job submission
1146    synch_count: how many hosts should be used per autoserv execution
1147    run_verify: Whether or not to run the verify phase
1148    run_reset: Whether or not to run the reset phase
1149    timeout: DEPRECATED - hours from queuing time until job times out
1150    timeout_mins: minutes from job queuing time until the job times out
1151    max_runtime_hrs: DEPRECATED - hours from job starting time until job
1152                     times out
1153    max_runtime_mins: minutes from job starting time until job times out
1154    email_list: list of people to email on completion delimited by any of:
1155                white space, ',', ':', ';'
1156    dependency_labels: many-to-many relationship with labels corresponding to
1157                       job dependencies
1158    reboot_before: Never, If dirty, or Always
1159    reboot_after: Never, If all tests passed, or Always
1160    parse_failed_repair: if True, a failed repair launched by this job will have
1161    its results parsed as part of the job.
1162    drone_set: The set of drones to run this job on
1163    parent_job: Parent job (optional)
1164    test_retry: Number of times to retry test if the test did not complete
1165                successfully. (optional, default: 0)
1166    require_ssp: Require server-side packaging unless require_ssp is set to
1167                 False. (optional, default: None)
1168    """
1169
1170    # TODO: Investigate, if jobkeyval_set is really needed.
1171    # dynamic_suite will write them into an attached file for the drone, but
1172    # it doesn't seem like they are actually used. If they aren't used, remove
1173    # jobkeyval_set here.
1174    SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels',
1175                                         'hostqueueentry_set',
1176                                         'jobkeyval_set',
1177                                         'shard'])
1178
1179    # SQL for selecting jobs that should be sent to shard.
1180    # We use raw sql as django filters were not optimized.
1181    # The following jobs are excluded by the SQL.
1182    #     - Non-aborted jobs known to shard as specified in |known_ids|.
1183    #       Note for jobs aborted on master, even if already known to shard,
1184    #       will be sent to shard again so that shard can abort them.
1185    #     - Completed jobs
1186    #     - Active jobs
1187    #     - Jobs without host_queue_entries
1188    NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))'
1189
1190    SQL_SHARD_JOBS = (
1191        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
1192        'INNER JOIN afe_host_queue_entries t2  ON '
1193        '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
1194        '   %(check_known_jobs)s) '
1195        'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) '
1196        'JOIN afe_shards_labels t4 '
1197        '  ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) '
1198        'WHERE t4.shard_id = %(shard_id)s'
1199        )
1200
1201    # Jobs can be created with assigned hosts and have no dependency
1202    # labels nor meta_host.
1203    # We are looking for:
1204    #     - a job whose hqe's meta_host is null
1205    #     - a job whose hqe has a host
1206    #     - one of the host's labels matches the shard's label.
1207    # Non-aborted known jobs, completed jobs, active jobs, jobs
1208    # without hqe are exluded as we do with SQL_SHARD_JOBS.
1209    SQL_SHARD_JOBS_WITH_HOSTS = (
1210        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
1211        'INNER JOIN afe_host_queue_entries t2 ON '
1212        '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
1213        '   AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL '
1214        '   %(check_known_jobs)s) '
1215        'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) '
1216        'WHERE (t3.label_id IN '
1217        '  (SELECT label_id FROM afe_shards_labels '
1218        '   WHERE shard_id = %(shard_id)s))'
1219        )
1220
1221    # Even if we had filters about complete, active and aborted
1222    # bits in the above two SQLs, there is a chance that
1223    # the result may still contain a job with an hqe with 'complete=1'
1224    # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.'
1225    # This happens when a job has two (or more) hqes and at least
1226    # one hqe has different bits than others.
1227    # We use a second sql to ensure we exclude all un-desired jobs.
1228    SQL_JOBS_TO_EXCLUDE =(
1229        'SELECT t1.id FROM afe_jobs t1 '
1230        'INNER JOIN afe_host_queue_entries t2 ON '
1231        '  (t1.id = t2.job_id) '
1232        'WHERE (t1.id in (%(candidates)s) '
1233        '  AND (t2.complete=1 OR t2.active=1 '
1234        '  %(check_known_jobs)s))'
1235        )
1236
1237    def _deserialize_relation(self, link, data):
1238        if link in ['hostqueueentry_set', 'jobkeyval_set']:
1239            for obj in data:
1240                obj['job_id'] = self.id
1241
1242        super(Job, self)._deserialize_relation(link, data)
1243
1244
1245    def custom_deserialize_relation(self, link, data):
1246        assert link == 'shard', 'Link %s should not be deserialized' % link
1247        self.shard = Shard.deserialize(data)
1248
1249
1250    def sanity_check_update_from_shard(self, shard, updated_serialized):
1251        # If the job got aborted on the master after the client fetched it
1252        # no shard_id will be set. The shard might still push updates though,
1253        # as the job might complete before the abort bit syncs to the shard.
1254        # Alternative considered: The master scheduler could be changed to not
1255        # set aborted jobs to completed that are sharded out. But that would
1256        # require database queries and seemed more complicated to implement.
1257        # This seems safe to do, as there won't be updates pushed from the wrong
1258        # shards should be powered off and wiped hen they are removed from the
1259        # master.
1260        if self.shard_id and self.shard_id != shard.id:
1261            raise error.IgnorableUnallowedRecordsSentToMaster(
1262                'Job id=%s is assigned to shard (%s). Cannot update it with %s '
1263                'from shard %s.' % (self.id, self.shard_id, updated_serialized,
1264                                    shard.id))
1265
1266
1267    # TIMEOUT is deprecated.
1268    DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
1269        'AUTOTEST_WEB', 'job_timeout_default', default=24)
1270    DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value(
1271        'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60)
1272    # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is
1273    # completed.
1274    DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(
1275        'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)
1276    DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value(
1277        'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60)
1278    DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
1279        'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,
1280        default=False)
1281
1282    owner = dbmodels.CharField(max_length=255)
1283    name = dbmodels.CharField(max_length=255)
1284    priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT)
1285    control_file = dbmodels.TextField(null=True, blank=True)
1286    control_type = dbmodels.SmallIntegerField(
1287        choices=control_data.CONTROL_TYPE.choices(),
1288        blank=True, # to allow 0
1289        default=control_data.CONTROL_TYPE.CLIENT)
1290    created_on = dbmodels.DateTimeField()
1291    synch_count = dbmodels.IntegerField(blank=True, default=0)
1292    timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)
1293    run_verify = dbmodels.BooleanField(default=False)
1294    email_list = dbmodels.CharField(max_length=250, blank=True)
1295    dependency_labels = (
1296            dbmodels.ManyToManyField(Label, blank=True,
1297                                     db_table='afe_jobs_dependency_labels'))
1298    reboot_before = dbmodels.SmallIntegerField(
1299        choices=model_attributes.RebootBefore.choices(), blank=True,
1300        default=DEFAULT_REBOOT_BEFORE)
1301    reboot_after = dbmodels.SmallIntegerField(
1302        choices=model_attributes.RebootAfter.choices(), blank=True,
1303        default=DEFAULT_REBOOT_AFTER)
1304    parse_failed_repair = dbmodels.BooleanField(
1305        default=DEFAULT_PARSE_FAILED_REPAIR)
1306    # max_runtime_hrs is deprecated. Will be removed after switch to mins is
1307    # completed.
1308    max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)
1309    max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS)
1310    drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
1311
1312    # TODO(jrbarnette)  We have to keep `parameterized_job` around or it
1313    # breaks the scheduler_models unit tests (and fixing the unit tests
1314    # will break the scheduler, so don't do that).
1315    #
1316    # The ultimate fix is to delete the column from the database table
1317    # at which point, you _must_ delete this.  Until you're ready to do
1318    # that, DON'T MUCK WITH IT.
1319    parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True,
1320                                            blank=True)
1321
1322    parent_job = dbmodels.ForeignKey('self', blank=True, null=True)
1323
1324    test_retry = dbmodels.IntegerField(blank=True, default=0)
1325
1326    run_reset = dbmodels.BooleanField(default=True)
1327
1328    timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
1329
1330    # If this is None on the master, a slave should be found.
1331    # If this is None on a slave, it should be synced back to the master
1332    shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
1333
1334    # If this is None, server-side packaging will be used for server side test,
1335    # unless it's disabled in global config AUTOSERV/enable_ssp_container.
1336    require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True)
1337
1338    # custom manager
1339    objects = JobManager()
1340
1341
1342    @decorators.cached_property
1343    def labels(self):
1344        """All the labels of this job"""
1345        # We need to convert dependency_labels to a list, because all() gives us
1346        # back an iterator, and storing/caching an iterator means we'd only be
1347        # able to read from it once.
1348        return list(self.dependency_labels.all())
1349
1350
1351    def is_server_job(self):
1352        """Returns whether this job is of type server."""
1353        return self.control_type == control_data.CONTROL_TYPE.SERVER
1354
1355
1356    @classmethod
1357    def create(cls, owner, options, hosts):
1358        """Creates a job.
1359
1360        The job is created by taking some information (the listed args) and
1361        filling in the rest of the necessary information.
1362
1363        @param cls: Implicit class object.
1364        @param owner: The owner for the job.
1365        @param options: An options object.
1366        @param hosts: The hosts to use.
1367        """
1368        AclGroup.check_for_acl_violation_hosts(hosts)
1369
1370        control_file = options.get('control_file')
1371
1372        user = User.current_user()
1373        if options.get('reboot_before') is None:
1374            options['reboot_before'] = user.get_reboot_before_display()
1375        if options.get('reboot_after') is None:
1376            options['reboot_after'] = user.get_reboot_after_display()
1377
1378        drone_set = DroneSet.resolve_name(options.get('drone_set'))
1379
1380        if options.get('timeout_mins') is None and options.get('timeout'):
1381            options['timeout_mins'] = options['timeout'] * 60
1382
1383        job = cls.add_object(
1384            owner=owner,
1385            name=options['name'],
1386            priority=options['priority'],
1387            control_file=control_file,
1388            control_type=options['control_type'],
1389            synch_count=options.get('synch_count'),
1390            # timeout needs to be deleted in the future.
1391            timeout=options.get('timeout'),
1392            timeout_mins=options.get('timeout_mins'),
1393            max_runtime_mins=options.get('max_runtime_mins'),
1394            run_verify=options.get('run_verify'),
1395            email_list=options.get('email_list'),
1396            reboot_before=options.get('reboot_before'),
1397            reboot_after=options.get('reboot_after'),
1398            parse_failed_repair=options.get('parse_failed_repair'),
1399            created_on=datetime.now(),
1400            drone_set=drone_set,
1401            parent_job=options.get('parent_job_id'),
1402            test_retry=options.get('test_retry'),
1403            run_reset=options.get('run_reset'),
1404            require_ssp=options.get('require_ssp'))
1405
1406        job.dependency_labels = options['dependencies']
1407
1408        if options.get('keyvals'):
1409            for key, value in options['keyvals'].iteritems():
1410                JobKeyval.objects.create(job=job, key=key, value=value)
1411
1412        return job
1413
1414
1415    @classmethod
1416    def assign_to_shard(cls, shard, known_ids):
1417        """Assigns unassigned jobs to a shard.
1418
1419        For all labels that have been assigned to this shard, all jobs that
1420        have this label, are assigned to this shard.
1421
1422        Jobs that are assigned to the shard but aren't already present on the
1423        shard are returned.
1424
1425        @param shard: The shard to assign jobs to.
1426        @param known_ids: List of all ids of incomplete jobs, the shard already
1427                          knows about.
1428                          This is used to figure out which jobs should be sent
1429                          to the shard. If shard_ids were used instead, jobs
1430                          would only be transferred once, even if the client
1431                          failed persisting them.
1432                          The number of unfinished jobs usually lies in O(1000).
1433                          Assuming one id takes 8 chars in the json, this means
1434                          overhead that lies in the lower kilobyte range.
1435                          A not in query with 5000 id's takes about 30ms.
1436
1437        @returns The job objects that should be sent to the shard.
1438        """
1439        # Disclaimer: Concurrent heartbeats should not occur in today's setup.
1440        # If this changes or they are triggered manually, this applies:
1441        # Jobs may be returned more than once by concurrent calls of this
1442        # function, as there is a race condition between SELECT and UPDATE.
1443        job_ids = set([])
1444        check_known_jobs_exclude = ''
1445        check_known_jobs_include = ''
1446
1447        if known_ids:
1448            check_known_jobs = (
1449                    cls.NON_ABORTED_KNOWN_JOBS %
1450                    {'known_ids': ','.join([str(i) for i in known_ids])})
1451            check_known_jobs_exclude = 'AND NOT ' + check_known_jobs
1452            check_known_jobs_include = 'OR ' + check_known_jobs
1453
1454        for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]:
1455            query = Job.objects.raw(sql % {
1456                    'check_known_jobs': check_known_jobs_exclude,
1457                    'shard_id': shard.id})
1458            job_ids |= set([j.id for j in query])
1459
1460        if job_ids:
1461            query = Job.objects.raw(
1462                    cls.SQL_JOBS_TO_EXCLUDE %
1463                    {'check_known_jobs': check_known_jobs_include,
1464                     'candidates': ','.join([str(i) for i in job_ids])})
1465            job_ids -= set([j.id for j in query])
1466
1467        if job_ids:
1468            Job.objects.filter(pk__in=job_ids).update(shard=shard)
1469            return list(Job.objects.filter(pk__in=job_ids).all())
1470        return []
1471
1472
1473    def queue(self, hosts, is_template=False):
1474        """Enqueue a job on the given hosts.
1475
1476        @param hosts: The hosts to use.
1477        @param is_template: Whether the status should be "Template".
1478        """
1479        if not hosts:
1480            # hostless job
1481            entry = HostQueueEntry.create(job=self, is_template=is_template)
1482            entry.save()
1483            return
1484
1485        for host in hosts:
1486            host.enqueue_job(self, is_template=is_template)
1487
1488
1489    def user(self):
1490        """Gets the user of this job, or None if it doesn't exist."""
1491        try:
1492            return User.objects.get(login=self.owner)
1493        except self.DoesNotExist:
1494            return None
1495
1496
1497    def abort(self):
1498        """Aborts this job."""
1499        for queue_entry in self.hostqueueentry_set.all():
1500            queue_entry.abort()
1501
1502
1503    def tag(self):
1504        """Returns a string tag for this job."""
1505        return server_utils.get_job_tag(self.id, self.owner)
1506
1507
1508    def keyval_dict(self):
1509        """Returns all keyvals for this job as a dictionary."""
1510        return dict((keyval.key, keyval.value)
1511                    for keyval in self.jobkeyval_set.all())
1512
1513
1514    @classmethod
1515    def get_attribute_model(cls):
1516        """Return the attribute model.
1517
1518        Override method in parent class. This class is called when
1519        deserializing the one-to-many relationship betwen Job and JobKeyval.
1520        On deserialization, we will try to clear any existing job keyvals
1521        associated with a job to avoid any inconsistency.
1522        Though Job doesn't implement ModelWithAttribute, we still treat
1523        it as an attribute model for this purpose.
1524
1525        @returns: The attribute model of Job.
1526        """
1527        return JobKeyval
1528
1529
1530    class Meta:
1531        """Metadata for class Job."""
1532        db_table = 'afe_jobs'
1533
1534    def __unicode__(self):
1535        return u'%s (%s-%s)' % (self.name, self.id, self.owner)
1536
1537
1538class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
1539    """Keyvals associated with jobs"""
1540
1541    SERIALIZATION_LINKS_TO_KEEP = set(['job'])
1542    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
1543
1544    job = dbmodels.ForeignKey(Job)
1545    key = dbmodels.CharField(max_length=90)
1546    value = dbmodels.CharField(max_length=300)
1547
1548    objects = model_logic.ExtendedManager()
1549
1550
1551    @classmethod
1552    def get_record(cls, data):
1553        """Check the database for an identical record.
1554
1555        Use job_id and key to search for a existing record.
1556
1557        @raises: DoesNotExist, if no record found
1558        @raises: MultipleObjectsReturned if multiple records found.
1559        """
1560        # TODO(fdeng): We should use job_id and key together as
1561        #              a primary key in the db.
1562        return cls.objects.get(job_id=data['job_id'], key=data['key'])
1563
1564
1565    @classmethod
1566    def deserialize(cls, data):
1567        """Override deserialize in parent class.
1568
1569        Do not deserialize id as id is not kept consistent on master and shards.
1570
1571        @param data: A dictionary of data to deserialize.
1572
1573        @returns: A JobKeyval object.
1574        """
1575        if data:
1576            data.pop('id')
1577        return super(JobKeyval, cls).deserialize(data)
1578
1579
1580    class Meta:
1581        """Metadata for class JobKeyval."""
1582        db_table = 'afe_job_keyvals'
1583
1584
1585class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
1586    """Represents an ineligible host queue."""
1587    job = dbmodels.ForeignKey(Job)
1588    host = dbmodels.ForeignKey(Host)
1589
1590    objects = model_logic.ExtendedManager()
1591
1592    class Meta:
1593        """Metadata for class IneligibleHostQueue."""
1594        db_table = 'afe_ineligible_host_queues'
1595
1596
1597class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
1598    """Represents a host queue entry."""
1599
1600    SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host'])
1601    SERIALIZATION_LINKS_TO_KEEP = set(['host'])
1602    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted'])
1603
1604
1605    def custom_deserialize_relation(self, link, data):
1606        assert link == 'meta_host'
1607        self.meta_host = Label.deserialize(data)
1608
1609
1610    def sanity_check_update_from_shard(self, shard, updated_serialized,
1611                                       job_ids_sent):
1612        if self.job_id not in job_ids_sent:
1613            raise error.IgnorableUnallowedRecordsSentToMaster(
1614                'Sent HostQueueEntry without corresponding '
1615                'job entry: %s' % updated_serialized)
1616
1617
1618    Status = host_queue_entry_states.Status
1619    ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
1620    COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
1621    PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES
1622    IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES
1623
1624    job = dbmodels.ForeignKey(Job)
1625    host = dbmodels.ForeignKey(Host, blank=True, null=True)
1626    status = dbmodels.CharField(max_length=255)
1627    meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,
1628                                    db_column='meta_host')
1629    active = dbmodels.BooleanField(default=False)
1630    complete = dbmodels.BooleanField(default=False)
1631    deleted = dbmodels.BooleanField(default=False)
1632    execution_subdir = dbmodels.CharField(max_length=255, blank=True,
1633                                          default='')
1634    # If atomic_group is set, this is a virtual HostQueueEntry that will
1635    # be expanded into many actual hosts within the group at schedule time.
1636    atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
1637    aborted = dbmodels.BooleanField(default=False)
1638    started_on = dbmodels.DateTimeField(null=True, blank=True)
1639    finished_on = dbmodels.DateTimeField(null=True, blank=True)
1640
1641    objects = model_logic.ExtendedManager()
1642
1643
1644    def __init__(self, *args, **kwargs):
1645        super(HostQueueEntry, self).__init__(*args, **kwargs)
1646        self._record_attributes(['status'])
1647
1648
1649    @classmethod
1650    def create(cls, job, host=None, meta_host=None,
1651                 is_template=False):
1652        """Creates a new host queue entry.
1653
1654        @param cls: Implicit class object.
1655        @param job: The associated job.
1656        @param host: The associated host.
1657        @param meta_host: The associated meta host.
1658        @param is_template: Whether the status should be "Template".
1659        """
1660        if is_template:
1661            status = cls.Status.TEMPLATE
1662        else:
1663            status = cls.Status.QUEUED
1664
1665        return cls(job=job, host=host, meta_host=meta_host, status=status)
1666
1667
1668    def save(self, *args, **kwargs):
1669        self._set_active_and_complete()
1670        super(HostQueueEntry, self).save(*args, **kwargs)
1671        self._check_for_updated_attributes()
1672
1673
1674    def execution_path(self):
1675        """
1676        Path to this entry's results (relative to the base results directory).
1677        """
1678        return server_utils.get_hqe_exec_path(self.job.tag(),
1679                                              self.execution_subdir)
1680
1681
1682    def host_or_metahost_name(self):
1683        """Returns the first non-None name found in priority order.
1684
1685        The priority order checked is: (1) host name; (2) meta host name
1686        """
1687        if self.host:
1688            return self.host.hostname
1689        else:
1690            assert self.meta_host
1691            return self.meta_host.name
1692
1693
1694    def _set_active_and_complete(self):
1695        if self.status in self.ACTIVE_STATUSES:
1696            self.active, self.complete = True, False
1697        elif self.status in self.COMPLETE_STATUSES:
1698            self.active, self.complete = False, True
1699        else:
1700            self.active, self.complete = False, False
1701
1702
1703    def on_attribute_changed(self, attribute, old_value):
1704        assert attribute == 'status'
1705        logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id,
1706                     self.status)
1707
1708
1709    def is_meta_host_entry(self):
1710        'True if this is a entry has a meta_host instead of a host.'
1711        return self.host is None and self.meta_host is not None
1712
1713
1714    # This code is shared between rpc_interface and models.HostQueueEntry.
1715    # Sadly due to circular imports between the 2 (crbug.com/230100) making it
1716    # a class method was the best way to refactor it. Attempting to put it in
1717    # rpc_utils or a new utils module failed as that would require us to import
1718    # models.py but to call it from here we would have to import the utils.py
1719    # thus creating a cycle.
1720    @classmethod
1721    def abort_host_queue_entries(cls, host_queue_entries):
1722        """Aborts a collection of host_queue_entries.
1723
1724        Abort these host queue entry and all host queue entries of jobs created
1725        by them.
1726
1727        @param host_queue_entries: List of host queue entries we want to abort.
1728        """
1729        # This isn't completely immune to race conditions since it's not atomic,
1730        # but it should be safe given the scheduler's behavior.
1731
1732        # TODO(milleral): crbug.com/230100
1733        # The |abort_host_queue_entries| rpc does nearly exactly this,
1734        # however, trying to re-use the code generates some horrible
1735        # circular import error.  I'd be nice to refactor things around
1736        # sometime so the code could be reused.
1737
1738        # Fixpoint algorithm to find the whole tree of HQEs to abort to
1739        # minimize the total number of database queries:
1740        children = set()
1741        new_children = set(host_queue_entries)
1742        while new_children:
1743            children.update(new_children)
1744            new_child_ids = [hqe.job_id for hqe in new_children]
1745            new_children = HostQueueEntry.objects.filter(
1746                    job__parent_job__in=new_child_ids,
1747                    complete=False, aborted=False).all()
1748            # To handle circular parental relationships
1749            new_children = set(new_children) - children
1750
1751        # Associate a user with the host queue entries that we're about
1752        # to abort so that we can look up who to blame for the aborts.
1753        now = datetime.now()
1754        user = User.current_user()
1755        aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe,
1756                aborted_by=user, aborted_on=now) for hqe in children]
1757        AbortedHostQueueEntry.objects.bulk_create(aborted_hqes)
1758        # Bulk update all of the HQEs to set the abort bit.
1759        child_ids = [hqe.id for hqe in children]
1760        HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True)
1761
1762
1763    def abort(self):
1764        """ Aborts this host queue entry.
1765
1766        Abort this host queue entry and all host queue entries of jobs created by
1767        this one.
1768
1769        """
1770        if not self.complete and not self.aborted:
1771            HostQueueEntry.abort_host_queue_entries([self])
1772
1773
1774    @classmethod
1775    def compute_full_status(cls, status, aborted, complete):
1776        """Returns a modified status msg if the host queue entry was aborted.
1777
1778        @param cls: Implicit class object.
1779        @param status: The original status message.
1780        @param aborted: Whether the host queue entry was aborted.
1781        @param complete: Whether the host queue entry was completed.
1782        """
1783        if aborted and not complete:
1784            return 'Aborted (%s)' % status
1785        return status
1786
1787
1788    def full_status(self):
1789        """Returns the full status of this host queue entry, as a string."""
1790        return self.compute_full_status(self.status, self.aborted,
1791                                        self.complete)
1792
1793
1794    def _postprocess_object_dict(self, object_dict):
1795        object_dict['full_status'] = self.full_status()
1796
1797
1798    class Meta:
1799        """Metadata for class HostQueueEntry."""
1800        db_table = 'afe_host_queue_entries'
1801
1802
1803
1804    def __unicode__(self):
1805        hostname = None
1806        if self.host:
1807            hostname = self.host.hostname
1808        return u"%s/%d (%d)" % (hostname, self.job.id, self.id)
1809
1810
1811class HostQueueEntryStartTimes(dbmodels.Model):
1812    """An auxilary table to HostQueueEntry to index by start time."""
1813    insert_time = dbmodels.DateTimeField()
1814    highest_hqe_id = dbmodels.IntegerField()
1815
1816    class Meta:
1817        """Metadata for class HostQueueEntryStartTimes."""
1818        db_table = 'afe_host_queue_entry_start_times'
1819
1820
1821class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
1822    """Represents an aborted host queue entry."""
1823    queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True)
1824    aborted_by = dbmodels.ForeignKey(User)
1825    aborted_on = dbmodels.DateTimeField()
1826
1827    objects = model_logic.ExtendedManager()
1828
1829
1830    def save(self, *args, **kwargs):
1831        self.aborted_on = datetime.now()
1832        super(AbortedHostQueueEntry, self).save(*args, **kwargs)
1833
1834    class Meta:
1835        """Metadata for class AbortedHostQueueEntry."""
1836        db_table = 'afe_aborted_host_queue_entries'
1837
1838
1839class SpecialTask(dbmodels.Model, model_logic.ModelExtensions):
1840    """\
1841    Tasks to run on hosts at the next time they are in the Ready state. Use this
1842    for high-priority tasks, such as forced repair or forced reinstall.
1843
1844    host: host to run this task on
1845    task: special task to run
1846    time_requested: date and time the request for this task was made
1847    is_active: task is currently running
1848    is_complete: task has finished running
1849    is_aborted: task was aborted
1850    time_started: date and time the task started
1851    time_finished: date and time the task finished
1852    queue_entry: Host queue entry waiting on this task (or None, if task was not
1853                 started in preparation of a job)
1854    """
1855    Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision',
1856                     string_values=True)
1857
1858    host = dbmodels.ForeignKey(Host, blank=False, null=False)
1859    task = dbmodels.CharField(max_length=64, choices=Task.choices(),
1860                              blank=False, null=False)
1861    requested_by = dbmodels.ForeignKey(User)
1862    time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False,
1863                                            null=False)
1864    is_active = dbmodels.BooleanField(default=False, blank=False, null=False)
1865    is_complete = dbmodels.BooleanField(default=False, blank=False, null=False)
1866    is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False)
1867    time_started = dbmodels.DateTimeField(null=True, blank=True)
1868    queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True)
1869    success = dbmodels.BooleanField(default=False, blank=False, null=False)
1870    time_finished = dbmodels.DateTimeField(null=True, blank=True)
1871
1872    objects = model_logic.ExtendedManager()
1873
1874
1875    def save(self, **kwargs):
1876        if self.queue_entry:
1877            self.requested_by = User.objects.get(
1878                    login=self.queue_entry.job.owner)
1879        super(SpecialTask, self).save(**kwargs)
1880
1881
1882    def execution_path(self):
1883        """Returns the execution path for a special task."""
1884        return server_utils.get_special_task_exec_path(
1885                self.host.hostname, self.id, self.task, self.time_requested)
1886
1887
1888    # property to emulate HostQueueEntry.status
1889    @property
1890    def status(self):
1891        """Returns a host queue entry status appropriate for a speical task."""
1892        return server_utils.get_special_task_status(
1893                self.is_complete, self.success, self.is_active)
1894
1895
1896    # property to emulate HostQueueEntry.started_on
1897    @property
1898    def started_on(self):
1899        """Returns the time at which this special task started."""
1900        return self.time_started
1901
1902
1903    @classmethod
1904    def schedule_special_task(cls, host, task):
1905        """Schedules a special task on a host if not already scheduled.
1906
1907        @param cls: Implicit class object.
1908        @param host: The host to use.
1909        @param task: The task to schedule.
1910        """
1911        existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task,
1912                                                    is_active=False,
1913                                                    is_complete=False)
1914        if existing_tasks:
1915            return existing_tasks[0]
1916
1917        special_task = SpecialTask(host=host, task=task,
1918                                   requested_by=User.current_user())
1919        special_task.save()
1920        return special_task
1921
1922
1923    def abort(self):
1924        """ Abort this special task."""
1925        self.is_aborted = True
1926        self.save()
1927
1928
1929    def activate(self):
1930        """
1931        Sets a task as active and sets the time started to the current time.
1932        """
1933        logging.info('Starting: %s', self)
1934        self.is_active = True
1935        self.time_started = datetime.now()
1936        self.save()
1937
1938
1939    def finish(self, success):
1940        """Sets a task as completed.
1941
1942        @param success: Whether or not the task was successful.
1943        """
1944        logging.info('Finished: %s', self)
1945        self.is_active = False
1946        self.is_complete = True
1947        self.success = success
1948        if self.time_started:
1949            self.time_finished = datetime.now()
1950        self.save()
1951
1952
1953    class Meta:
1954        """Metadata for class SpecialTask."""
1955        db_table = 'afe_special_tasks'
1956
1957
1958    def __unicode__(self):
1959        result = u'Special Task %s (host %s, task %s, time %s)' % (
1960            self.id, self.host, self.task, self.time_requested)
1961        if self.is_complete:
1962            result += u' (completed)'
1963        elif self.is_active:
1964            result += u' (active)'
1965
1966        return result
1967
1968
1969class StableVersion(dbmodels.Model, model_logic.ModelExtensions):
1970
1971    board = dbmodels.CharField(max_length=255, unique=True)
1972    version = dbmodels.CharField(max_length=255)
1973
1974    class Meta:
1975        """Metadata for class StableVersion."""
1976        db_table = 'afe_stable_versions'
1977