1# pylint: disable=missing-docstring
2
3import logging
4from datetime import datetime
5import django.core
6try:
7    from django.db import models as dbmodels, connection
8except django.core.exceptions.ImproperlyConfigured:
9    raise ImportError('Django database not yet configured. Import either '
10                       'setup_django_environment or '
11                       'setup_django_lite_environment from '
12                       'autotest_lib.frontend before any imports that '
13                       'depend on django models.')
14from xml.sax import saxutils
15import common
16from autotest_lib.frontend.afe import model_logic, model_attributes
17from autotest_lib.frontend.afe import rdb_model_extensions
18from autotest_lib.frontend import settings, thread_local
19from autotest_lib.client.common_lib import enum, error, host_protections
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.client.common_lib import host_queue_entry_states
22from autotest_lib.client.common_lib import control_data, priorities, decorators
23from autotest_lib.client.common_lib import site_utils
24from autotest_lib.client.common_lib.cros.graphite import autotest_es
25from autotest_lib.server import utils as server_utils
26
27# job options and user preferences
28DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
29DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER
30
31
32class AclAccessViolation(Exception):
33    """\
34    Raised when an operation is attempted with proper permissions as
35    dictated by ACLs.
36    """
37
38
39class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model):
40    """\
41    An atomic group defines a collection of hosts which must only be scheduled
42    all at once.  Any host with a label having an atomic group will only be
43    scheduled for a job at the same time as other hosts sharing that label.
44
45    Required:
46      name: A name for this atomic group, e.g. 'rack23' or 'funky_net'.
47      max_number_of_machines: The maximum number of machines that will be
48              scheduled at once when scheduling jobs to this atomic group.
49              The job.synch_count is considered the minimum.
50
51    Optional:
52      description: Arbitrary text description of this group's purpose.
53    """
54    name = dbmodels.CharField(max_length=255, unique=True)
55    description = dbmodels.TextField(blank=True)
56    # This magic value is the default to simplify the scheduler logic.
57    # It must be "large".  The common use of atomic groups is to want all
58    # machines in the group to be used, limits on which subset used are
59    # often chosen via dependency labels.
60    # TODO(dennisjeffrey): Revisit this so we don't have to assume that
61    # "infinity" is around 3.3 million.
62    INFINITE_MACHINES = 333333333
63    max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES)
64    invalid = dbmodels.BooleanField(default=False,
65                                  editable=settings.FULL_ADMIN)
66
67    name_field = 'name'
68    objects = model_logic.ModelWithInvalidManager()
69    valid_objects = model_logic.ValidObjectsManager()
70
71
72    def enqueue_job(self, job, is_template=False):
73        """Enqueue a job on an associated atomic group of hosts.
74
75        @param job: A job to enqueue.
76        @param is_template: Whether the status should be "Template".
77        """
78        queue_entry = HostQueueEntry.create(atomic_group=self, job=job,
79                                            is_template=is_template)
80        queue_entry.save()
81
82
83    def clean_object(self):
84        self.label_set.clear()
85
86
87    class Meta:
88        """Metadata for class AtomicGroup."""
89        db_table = 'afe_atomic_groups'
90
91
92    def __unicode__(self):
93        return unicode(self.name)
94
95
96class Label(model_logic.ModelWithInvalid, dbmodels.Model):
97    """\
98    Required:
99      name: label name
100
101    Optional:
102      kernel_config: URL/path to kernel config for jobs run on this label.
103      platform: If True, this is a platform label (defaults to False).
104      only_if_needed: If True, a Host with this label can only be used if that
105              label is requested by the job/test (either as the meta_host or
106              in the job_dependencies).
107      atomic_group: The atomic group associated with this label.
108    """
109    name = dbmodels.CharField(max_length=255, unique=True)
110    kernel_config = dbmodels.CharField(max_length=255, blank=True)
111    platform = dbmodels.BooleanField(default=False)
112    invalid = dbmodels.BooleanField(default=False,
113                                    editable=settings.FULL_ADMIN)
114    only_if_needed = dbmodels.BooleanField(default=False)
115
116    name_field = 'name'
117    objects = model_logic.ModelWithInvalidManager()
118    valid_objects = model_logic.ValidObjectsManager()
119    atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True)
120
121
122    def clean_object(self):
123        self.host_set.clear()
124        self.test_set.clear()
125
126
127    def enqueue_job(self, job, is_template=False):
128        """Enqueue a job on any host of this label.
129
130        @param job: A job to enqueue.
131        @param is_template: Whether the status should be "Template".
132        """
133        queue_entry = HostQueueEntry.create(meta_host=self, job=job,
134                                            is_template=is_template)
135        queue_entry.save()
136
137
138
139    class Meta:
140        """Metadata for class Label."""
141        db_table = 'afe_labels'
142
143
144    def __unicode__(self):
145        return unicode(self.name)
146
147
148class Shard(dbmodels.Model, model_logic.ModelExtensions):
149
150    hostname = dbmodels.CharField(max_length=255, unique=True)
151
152    name_field = 'hostname'
153
154    labels = dbmodels.ManyToManyField(Label, blank=True,
155                                      db_table='afe_shards_labels')
156
157    class Meta:
158        """Metadata for class ParameterizedJob."""
159        db_table = 'afe_shards'
160
161
162    def rpc_hostname(self):
163        """Get the rpc hostname of the shard.
164
165        @return: Just the shard hostname for all non-testing environments.
166                 The address of the default gateway for vm testing environments.
167        """
168        # TODO: Figure out a better solution for testing. Since no 2 shards
169        # can run on the same host, if the shard hostname is localhost we
170        # conclude that it must be a vm in a test cluster. In such situations
171        # a name of localhost:<port> is necessary to achieve the correct
172        # afe links/redirection from the frontend (this happens through the
173        # host), but for rpcs that are performed *on* the shard, they need to
174        # use the address of the gateway.
175        # In the virtual machine testing environment (i.e., puppylab), each
176        # shard VM has a hostname like localhost:<port>. In the real cluster
177        # environment, a shard node does not have 'localhost' for its hostname.
178        # The following hostname substitution is needed only for the VM
179        # in puppylab.
180        # The 'hostname' should not be replaced in the case of real cluster.
181        if site_utils.is_puppylab_vm(self.hostname):
182            hostname = self.hostname.split(':')[0]
183            return self.hostname.replace(
184                    hostname, site_utils.DEFAULT_VM_GATEWAY)
185        return self.hostname
186
187
188class Drone(dbmodels.Model, model_logic.ModelExtensions):
189    """
190    A scheduler drone
191
192    hostname: the drone's hostname
193    """
194    hostname = dbmodels.CharField(max_length=255, unique=True)
195
196    name_field = 'hostname'
197    objects = model_logic.ExtendedManager()
198
199
200    def save(self, *args, **kwargs):
201        if not User.current_user().is_superuser():
202            raise Exception('Only superusers may edit drones')
203        super(Drone, self).save(*args, **kwargs)
204
205
206    def delete(self):
207        if not User.current_user().is_superuser():
208            raise Exception('Only superusers may delete drones')
209        super(Drone, self).delete()
210
211
212    class Meta:
213        """Metadata for class Drone."""
214        db_table = 'afe_drones'
215
216    def __unicode__(self):
217        return unicode(self.hostname)
218
219
220class DroneSet(dbmodels.Model, model_logic.ModelExtensions):
221    """
222    A set of scheduler drones
223
224    These will be used by the scheduler to decide what drones a job is allowed
225    to run on.
226
227    name: the drone set's name
228    drones: the drones that are part of the set
229    """
230    DRONE_SETS_ENABLED = global_config.global_config.get_config_value(
231            'SCHEDULER', 'drone_sets_enabled', type=bool, default=False)
232    DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value(
233            'SCHEDULER', 'default_drone_set_name', default=None)
234
235    name = dbmodels.CharField(max_length=255, unique=True)
236    drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones')
237
238    name_field = 'name'
239    objects = model_logic.ExtendedManager()
240
241
242    def save(self, *args, **kwargs):
243        if not User.current_user().is_superuser():
244            raise Exception('Only superusers may edit drone sets')
245        super(DroneSet, self).save(*args, **kwargs)
246
247
248    def delete(self):
249        if not User.current_user().is_superuser():
250            raise Exception('Only superusers may delete drone sets')
251        super(DroneSet, self).delete()
252
253
254    @classmethod
255    def drone_sets_enabled(cls):
256        """Returns whether drone sets are enabled.
257
258        @param cls: Implicit class object.
259        """
260        return cls.DRONE_SETS_ENABLED
261
262
263    @classmethod
264    def default_drone_set_name(cls):
265        """Returns the default drone set name.
266
267        @param cls: Implicit class object.
268        """
269        return cls.DEFAULT_DRONE_SET_NAME
270
271
272    @classmethod
273    def get_default(cls):
274        """Gets the default drone set name, compatible with Job.add_object.
275
276        @param cls: Implicit class object.
277        """
278        return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME)
279
280
281    @classmethod
282    def resolve_name(cls, drone_set_name):
283        """
284        Returns the name of one of these, if not None, in order of preference:
285        1) the drone set given,
286        2) the current user's default drone set, or
287        3) the global default drone set
288
289        or returns None if drone sets are disabled
290
291        @param cls: Implicit class object.
292        @param drone_set_name: A drone set name.
293        """
294        if not cls.drone_sets_enabled():
295            return None
296
297        user = User.current_user()
298        user_drone_set_name = user.drone_set and user.drone_set.name
299
300        return drone_set_name or user_drone_set_name or cls.get_default().name
301
302
303    def get_drone_hostnames(self):
304        """
305        Gets the hostnames of all drones in this drone set
306        """
307        return set(self.drones.all().values_list('hostname', flat=True))
308
309
310    class Meta:
311        """Metadata for class DroneSet."""
312        db_table = 'afe_drone_sets'
313
314    def __unicode__(self):
315        return unicode(self.name)
316
317
318class User(dbmodels.Model, model_logic.ModelExtensions):
319    """\
320    Required:
321    login :user login name
322
323    Optional:
324    access_level: 0=User (default), 1=Admin, 100=Root
325    """
326    ACCESS_ROOT = 100
327    ACCESS_ADMIN = 1
328    ACCESS_USER = 0
329
330    AUTOTEST_SYSTEM = 'autotest_system'
331
332    login = dbmodels.CharField(max_length=255, unique=True)
333    access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True)
334
335    # user preferences
336    reboot_before = dbmodels.SmallIntegerField(
337        choices=model_attributes.RebootBefore.choices(), blank=True,
338        default=DEFAULT_REBOOT_BEFORE)
339    reboot_after = dbmodels.SmallIntegerField(
340        choices=model_attributes.RebootAfter.choices(), blank=True,
341        default=DEFAULT_REBOOT_AFTER)
342    drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
343    show_experimental = dbmodels.BooleanField(default=False)
344
345    name_field = 'login'
346    objects = model_logic.ExtendedManager()
347
348
349    def save(self, *args, **kwargs):
350        # is this a new object being saved for the first time?
351        first_time = (self.id is None)
352        user = thread_local.get_user()
353        if user and not user.is_superuser() and user.login != self.login:
354            raise AclAccessViolation("You cannot modify user " + self.login)
355        super(User, self).save(*args, **kwargs)
356        if first_time:
357            everyone = AclGroup.objects.get(name='Everyone')
358            everyone.users.add(self)
359
360
361    def is_superuser(self):
362        """Returns whether the user has superuser access."""
363        return self.access_level >= self.ACCESS_ROOT
364
365
366    @classmethod
367    def current_user(cls):
368        """Returns the current user.
369
370        @param cls: Implicit class object.
371        """
372        user = thread_local.get_user()
373        if user is None:
374            user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM)
375            user.access_level = cls.ACCESS_ROOT
376            user.save()
377        return user
378
379
380    @classmethod
381    def get_record(cls, data):
382        """Check the database for an identical record.
383
384        Check for a record with matching id and login. If one exists,
385        return it. If one does not exist there is a possibility that
386        the following cases have happened:
387        1. Same id, different login
388            We received: "1 chromeos-test"
389            And we have: "1 debug-user"
390        In this case we need to delete "1 debug_user" and insert
391        "1 chromeos-test".
392
393        2. Same login, different id:
394            We received: "1 chromeos-test"
395            And we have: "2 chromeos-test"
396        In this case we need to delete "2 chromeos-test" and insert
397        "1 chromeos-test".
398
399        As long as this method deletes bad records and raises the
400        DoesNotExist exception the caller will handle creating the
401        new record.
402
403        @raises: DoesNotExist, if a record with the matching login and id
404                does not exist.
405        """
406
407        # Both the id and login should be uniqe but there are cases when
408        # we might already have a user with the same login/id because
409        # current_user will proactively create a user record if it doesn't
410        # exist. Since we want to avoid conflict between the master and
411        # shard, just delete any existing user records that don't match
412        # what we're about to deserialize from the master.
413        try:
414            return cls.objects.get(login=data['login'], id=data['id'])
415        except cls.DoesNotExist:
416            cls.delete_matching_record(login=data['login'])
417            cls.delete_matching_record(id=data['id'])
418            raise
419
420
421    class Meta:
422        """Metadata for class User."""
423        db_table = 'afe_users'
424
425    def __unicode__(self):
426        return unicode(self.login)
427
428
429class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel,
430           model_logic.ModelWithAttributes):
431    """\
432    Required:
433    hostname
434
435    optional:
436    locked: if true, host is locked and will not be queued
437
438    Internal:
439    From AbstractHostModel:
440        status: string describing status of host
441        invalid: true if the host has been deleted
442        protection: indicates what can be done to this host during repair
443        lock_time: DateTime at which the host was locked
444        dirty: true if the host has been used without being rebooted
445    Local:
446        locked_by: user that locked the host, or null if the host is unlocked
447    """
448
449    SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set',
450                                         'hostattribute_set',
451                                         'labels',
452                                         'shard'])
453    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid'])
454
455
456    def custom_deserialize_relation(self, link, data):
457        assert link == 'shard', 'Link %s should not be deserialized' % link
458        self.shard = Shard.deserialize(data)
459
460
461    # Note: Only specify foreign keys here, specify all native host columns in
462    # rdb_model_extensions instead.
463    Protection = host_protections.Protection
464    labels = dbmodels.ManyToManyField(Label, blank=True,
465                                      db_table='afe_hosts_labels')
466    locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False)
467    name_field = 'hostname'
468    objects = model_logic.ModelWithInvalidManager()
469    valid_objects = model_logic.ValidObjectsManager()
470    leased_objects = model_logic.LeasedHostManager()
471
472    shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
473
474    def __init__(self, *args, **kwargs):
475        super(Host, self).__init__(*args, **kwargs)
476        self._record_attributes(['status'])
477
478
479    @staticmethod
480    def create_one_time_host(hostname):
481        """Creates a one-time host.
482
483        @param hostname: The name for the host.
484        """
485        query = Host.objects.filter(hostname=hostname)
486        if query.count() == 0:
487            host = Host(hostname=hostname, invalid=True)
488            host.do_validate()
489        else:
490            host = query[0]
491            if not host.invalid:
492                raise model_logic.ValidationError({
493                    'hostname' : '%s already exists in the autotest DB.  '
494                        'Select it rather than entering it as a one time '
495                        'host.' % hostname
496                    })
497        host.protection = host_protections.Protection.DO_NOT_REPAIR
498        host.locked = False
499        host.save()
500        host.clean_object()
501        return host
502
503
504    @classmethod
505    def assign_to_shard(cls, shard, known_ids):
506        """Assigns hosts to a shard.
507
508        For all labels that have been assigned to a shard, all hosts that
509        have at least one of the shard's labels are assigned to the shard.
510        Hosts that are assigned to the shard but aren't already present on the
511        shard are returned.
512
513        Board to shard mapping is many-to-one. Many different boards can be
514        hosted in a shard. However, DUTs of a single board cannot be distributed
515        into more than one shard.
516
517        @param shard: The shard object to assign labels/hosts for.
518        @param known_ids: List of all host-ids the shard already knows.
519                          This is used to figure out which hosts should be sent
520                          to the shard. If shard_ids were used instead, hosts
521                          would only be transferred once, even if the client
522                          failed persisting them.
523                          The number of hosts usually lies in O(100), so the
524                          overhead is acceptable.
525
526        @returns the hosts objects that should be sent to the shard.
527        """
528
529        # Disclaimer: concurrent heartbeats should theoretically not occur in
530        # the current setup. As they may be introduced in the near future,
531        # this comment will be left here.
532
533        # Sending stuff twice is acceptable, but forgetting something isn't.
534        # Detecting duplicates on the client is easy, but here it's harder. The
535        # following options were considered:
536        # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more
537        #   than select returned, as concurrently more hosts might have been
538        #   inserted
539        # - UPDATE and then SELECT WHERE shard=shard: select always returns all
540        #   hosts for the shard, this is overhead
541        # - SELECT and then UPDATE only selected without requerying afterwards:
542        #   returns the old state of the records.
543        host_ids = set(Host.objects.filter(
544            labels__in=shard.labels.all(),
545            leased=False
546            ).exclude(
547            id__in=known_ids,
548            ).values_list('pk', flat=True))
549
550        if host_ids:
551            Host.objects.filter(pk__in=host_ids).update(shard=shard)
552            return list(Host.objects.filter(pk__in=host_ids).all())
553        return []
554
555    def resurrect_object(self, old_object):
556        super(Host, self).resurrect_object(old_object)
557        # invalid hosts can be in use by the scheduler (as one-time hosts), so
558        # don't change the status
559        self.status = old_object.status
560
561
562    def clean_object(self):
563        self.aclgroup_set.clear()
564        self.labels.clear()
565
566
567    def record_state(self, type_str, state, value, other_metadata=None):
568        """Record metadata in elasticsearch.
569
570        @param type_str: sets the _type field in elasticsearch db.
571        @param state: string representing what state we are recording,
572                      e.g. 'locked'
573        @param value: value of the state, e.g. True
574        @param other_metadata: Other metadata to store in metaDB.
575        """
576        metadata = {
577            state: value,
578            'hostname': self.hostname,
579        }
580        if other_metadata:
581            metadata = dict(metadata.items() + other_metadata.items())
582        autotest_es.post(use_http=True, type_str=type_str, metadata=metadata)
583
584
585    def save(self, *args, **kwargs):
586        # extra spaces in the hostname can be a sneaky source of errors
587        self.hostname = self.hostname.strip()
588        # is this a new object being saved for the first time?
589        first_time = (self.id is None)
590        if not first_time:
591            AclGroup.check_for_acl_violation_hosts([self])
592        # If locked is changed, send its status and user made the change to
593        # metaDB. Locks are important in host history because if a device is
594        # locked then we don't really care what state it is in.
595        if self.locked and not self.locked_by:
596            self.locked_by = User.current_user()
597            if not self.lock_time:
598                self.lock_time = datetime.now()
599            self.record_state('lock_history', 'locked', self.locked,
600                              {'changed_by': self.locked_by.login,
601                               'lock_reason': self.lock_reason})
602            self.dirty = True
603        elif not self.locked and self.locked_by:
604            self.record_state('lock_history', 'locked', self.locked,
605                              {'changed_by': self.locked_by.login})
606            self.locked_by = None
607            self.lock_time = None
608        super(Host, self).save(*args, **kwargs)
609        if first_time:
610            everyone = AclGroup.objects.get(name='Everyone')
611            everyone.hosts.add(self)
612        self._check_for_updated_attributes()
613
614
615    def delete(self):
616        AclGroup.check_for_acl_violation_hosts([self])
617        for queue_entry in self.hostqueueentry_set.all():
618            queue_entry.deleted = True
619            queue_entry.abort()
620        super(Host, self).delete()
621
622
623    def on_attribute_changed(self, attribute, old_value):
624        assert attribute == 'status'
625        logging.info(self.hostname + ' -> ' + self.status)
626
627
628    def enqueue_job(self, job, is_template=False):
629        """Enqueue a job on this host.
630
631        @param job: A job to enqueue.
632        @param is_template: Whther the status should be "Template".
633        """
634        queue_entry = HostQueueEntry.create(host=self, job=job,
635                                            is_template=is_template)
636        # allow recovery of dead hosts from the frontend
637        if not self.active_queue_entry() and self.is_dead():
638            self.status = Host.Status.READY
639            self.save()
640        queue_entry.save()
641
642        block = IneligibleHostQueue(job=job, host=self)
643        block.save()
644
645
646    def platform(self):
647        """The platform of the host."""
648        # TODO(showard): slighly hacky?
649        platforms = self.labels.filter(platform=True)
650        if len(platforms) == 0:
651            return None
652        return platforms[0]
653    platform.short_description = 'Platform'
654
655
656    @classmethod
657    def check_no_platform(cls, hosts):
658        """Verify the specified hosts have no associated platforms.
659
660        @param cls: Implicit class object.
661        @param hosts: The hosts to verify.
662        @raises model_logic.ValidationError if any hosts already have a
663            platform.
664        """
665        Host.objects.populate_relationships(hosts, Label, 'label_list')
666        errors = []
667        for host in hosts:
668            platforms = [label.name for label in host.label_list
669                         if label.platform]
670            if platforms:
671                # do a join, just in case this host has multiple platforms,
672                # we'll be able to see it
673                errors.append('Host %s already has a platform: %s' % (
674                              host.hostname, ', '.join(platforms)))
675        if errors:
676            raise model_logic.ValidationError({'labels': '; '.join(errors)})
677
678
679    @classmethod
680    def check_board_labels_allowed(cls, hosts, new_labels=[]):
681        """Verify the specified hosts have valid board labels and the given
682        new board labels can be added.
683
684        @param cls: Implicit class object.
685        @param hosts: The hosts to verify.
686        @param new_labels: A list of labels to be added to the hosts.
687
688        @raises model_logic.ValidationError if any host has invalid board labels
689                or the given board labels cannot be added to the hsots.
690        """
691        Host.objects.populate_relationships(hosts, Label, 'label_list')
692        errors = []
693        for host in hosts:
694            boards = [label.name for label in host.label_list
695                      if label.name.startswith('board:')]
696            if not server_utils.board_labels_allowed(boards + new_labels):
697                # do a join, just in case this host has multiple boards,
698                # we'll be able to see it
699                errors.append('Host %s already has board labels: %s' % (
700                              host.hostname, ', '.join(boards)))
701        if errors:
702            raise model_logic.ValidationError({'labels': '; '.join(errors)})
703
704
705    def is_dead(self):
706        """Returns whether the host is dead (has status repair failed)."""
707        return self.status == Host.Status.REPAIR_FAILED
708
709
710    def active_queue_entry(self):
711        """Returns the active queue entry for this host, or None if none."""
712        active = list(self.hostqueueentry_set.filter(active=True))
713        if not active:
714            return None
715        assert len(active) == 1, ('More than one active entry for '
716                                  'host ' + self.hostname)
717        return active[0]
718
719
720    def _get_attribute_model_and_args(self, attribute):
721        return HostAttribute, dict(host=self, attribute=attribute)
722
723
724    @classmethod
725    def get_attribute_model(cls):
726        """Return the attribute model.
727
728        Override method in parent class. See ModelExtensions for details.
729        @returns: The attribute model of Host.
730        """
731        return HostAttribute
732
733
734    class Meta:
735        """Metadata for the Host class."""
736        db_table = 'afe_hosts'
737
738
739    def __unicode__(self):
740        return unicode(self.hostname)
741
742
743class HostAttribute(dbmodels.Model, model_logic.ModelExtensions):
744    """Arbitrary keyvals associated with hosts."""
745
746    SERIALIZATION_LINKS_TO_KEEP = set(['host'])
747    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
748    host = dbmodels.ForeignKey(Host)
749    attribute = dbmodels.CharField(max_length=90)
750    value = dbmodels.CharField(max_length=300)
751
752    objects = model_logic.ExtendedManager()
753
754    class Meta:
755        """Metadata for the HostAttribute class."""
756        db_table = 'afe_host_attributes'
757
758
759    @classmethod
760    def get_record(cls, data):
761        """Check the database for an identical record.
762
763        Use host_id and attribute to search for a existing record.
764
765        @raises: DoesNotExist, if no record found
766        @raises: MultipleObjectsReturned if multiple records found.
767        """
768        # TODO(fdeng): We should use host_id and attribute together as
769        #              a primary key in the db.
770        return cls.objects.get(host_id=data['host_id'],
771                               attribute=data['attribute'])
772
773
774    @classmethod
775    def deserialize(cls, data):
776        """Override deserialize in parent class.
777
778        Do not deserialize id as id is not kept consistent on master and shards.
779
780        @param data: A dictionary of data to deserialize.
781
782        @returns: A HostAttribute object.
783        """
784        if data:
785            data.pop('id')
786        return super(HostAttribute, cls).deserialize(data)
787
788
789class Test(dbmodels.Model, model_logic.ModelExtensions):
790    """\
791    Required:
792    author: author name
793    description: description of the test
794    name: test name
795    time: short, medium, long
796    test_class: This describes the class for your the test belongs in.
797    test_category: This describes the category for your tests
798    test_type: Client or Server
799    path: path to pass to run_test()
800    sync_count:  is a number >=1 (1 being the default). If it's 1, then it's an
801                 async job. If it's >1 it's sync job for that number of machines
802                 i.e. if sync_count = 2 it is a sync job that requires two
803                 machines.
804    Optional:
805    dependencies: What the test requires to run. Comma deliminated list
806    dependency_labels: many-to-many relationship with labels corresponding to
807                       test dependencies.
808    experimental: If this is set to True production servers will ignore the test
809    run_verify: Whether or not the scheduler should run the verify stage
810    run_reset: Whether or not the scheduler should run the reset stage
811    test_retry: Number of times to retry test if the test did not complete
812                successfully. (optional, default: 0)
813    """
814    TestTime = enum.Enum('SHORT', 'MEDIUM', 'LONG', start_value=1)
815
816    name = dbmodels.CharField(max_length=255, unique=True)
817    author = dbmodels.CharField(max_length=255)
818    test_class = dbmodels.CharField(max_length=255)
819    test_category = dbmodels.CharField(max_length=255)
820    dependencies = dbmodels.CharField(max_length=255, blank=True)
821    description = dbmodels.TextField(blank=True)
822    experimental = dbmodels.BooleanField(default=True)
823    run_verify = dbmodels.BooleanField(default=False)
824    test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(),
825                                           default=TestTime.MEDIUM)
826    test_type = dbmodels.SmallIntegerField(
827        choices=control_data.CONTROL_TYPE.choices())
828    sync_count = dbmodels.IntegerField(default=1)
829    path = dbmodels.CharField(max_length=255, unique=True)
830    test_retry = dbmodels.IntegerField(blank=True, default=0)
831    run_reset = dbmodels.BooleanField(default=True)
832
833    dependency_labels = (
834        dbmodels.ManyToManyField(Label, blank=True,
835                                 db_table='afe_autotests_dependency_labels'))
836    name_field = 'name'
837    objects = model_logic.ExtendedManager()
838
839
840    def admin_description(self):
841        """Returns a string representing the admin description."""
842        escaped_description = saxutils.escape(self.description)
843        return '<span style="white-space:pre">%s</span>' % escaped_description
844    admin_description.allow_tags = True
845    admin_description.short_description = 'Description'
846
847
848    class Meta:
849        """Metadata for class Test."""
850        db_table = 'afe_autotests'
851
852    def __unicode__(self):
853        return unicode(self.name)
854
855
856class TestParameter(dbmodels.Model):
857    """
858    A declared parameter of a test
859    """
860    test = dbmodels.ForeignKey(Test)
861    name = dbmodels.CharField(max_length=255)
862
863    class Meta:
864        """Metadata for class TestParameter."""
865        db_table = 'afe_test_parameters'
866        unique_together = ('test', 'name')
867
868    def __unicode__(self):
869        return u'%s (%s)' % (self.name, self.test.name)
870
871
872class Profiler(dbmodels.Model, model_logic.ModelExtensions):
873    """\
874    Required:
875    name: profiler name
876    test_type: Client or Server
877
878    Optional:
879    description: arbirary text description
880    """
881    name = dbmodels.CharField(max_length=255, unique=True)
882    description = dbmodels.TextField(blank=True)
883
884    name_field = 'name'
885    objects = model_logic.ExtendedManager()
886
887
888    class Meta:
889        """Metadata for class Profiler."""
890        db_table = 'afe_profilers'
891
892    def __unicode__(self):
893        return unicode(self.name)
894
895
896class AclGroup(dbmodels.Model, model_logic.ModelExtensions):
897    """\
898    Required:
899    name: name of ACL group
900
901    Optional:
902    description: arbitrary description of group
903    """
904
905    SERIALIZATION_LINKS_TO_FOLLOW = set(['users'])
906
907    name = dbmodels.CharField(max_length=255, unique=True)
908    description = dbmodels.CharField(max_length=255, blank=True)
909    users = dbmodels.ManyToManyField(User, blank=False,
910                                     db_table='afe_acl_groups_users')
911    hosts = dbmodels.ManyToManyField(Host, blank=True,
912                                     db_table='afe_acl_groups_hosts')
913
914    name_field = 'name'
915    objects = model_logic.ExtendedManager()
916
917    @staticmethod
918    def check_for_acl_violation_hosts(hosts):
919        """Verify the current user has access to the specified hosts.
920
921        @param hosts: The hosts to verify against.
922        @raises AclAccessViolation if the current user doesn't have access
923            to a host.
924        """
925        user = User.current_user()
926        if user.is_superuser():
927            return
928        accessible_host_ids = set(
929            host.id for host in Host.objects.filter(aclgroup__users=user))
930        for host in hosts:
931            # Check if the user has access to this host,
932            # but only if it is not a metahost or a one-time-host.
933            no_access = (isinstance(host, Host)
934                         and not host.invalid
935                         and int(host.id) not in accessible_host_ids)
936            if no_access:
937                raise AclAccessViolation("%s does not have access to %s" %
938                                         (str(user), str(host)))
939
940
941    @staticmethod
942    def check_abort_permissions(queue_entries):
943        """Look for queue entries that aren't abortable by the current user.
944
945        An entry is not abortable if:
946           * the job isn't owned by this user, and
947           * the machine isn't ACL-accessible, or
948           * the machine is in the "Everyone" ACL
949
950        @param queue_entries: The queue entries to check.
951        @raises AclAccessViolation if a queue entry is not abortable by the
952            current user.
953        """
954        user = User.current_user()
955        if user.is_superuser():
956            return
957        not_owned = queue_entries.exclude(job__owner=user.login)
958        # I do this using ID sets instead of just Django filters because
959        # filtering on M2M dbmodels is broken in Django 0.96. It's better in
960        # 1.0.
961        # TODO: Use Django filters, now that we're using 1.0.
962        accessible_ids = set(
963            entry.id for entry
964            in not_owned.filter(host__aclgroup__users__login=user.login))
965        public_ids = set(entry.id for entry
966                         in not_owned.filter(host__aclgroup__name='Everyone'))
967        cannot_abort = [entry for entry in not_owned.select_related()
968                        if entry.id not in accessible_ids
969                        or entry.id in public_ids]
970        if len(cannot_abort) == 0:
971            return
972        entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner,
973                                              entry.host_or_metahost_name())
974                                for entry in cannot_abort)
975        raise AclAccessViolation('You cannot abort the following job entries: '
976                                 + entry_names)
977
978
979    def check_for_acl_violation_acl_group(self):
980        """Verifies the current user has acces to this ACL group.
981
982        @raises AclAccessViolation if the current user doesn't have access to
983            this ACL group.
984        """
985        user = User.current_user()
986        if user.is_superuser():
987            return
988        if self.name == 'Everyone':
989            raise AclAccessViolation("You cannot modify 'Everyone'!")
990        if not user in self.users.all():
991            raise AclAccessViolation("You do not have access to %s"
992                                     % self.name)
993
994    @staticmethod
995    def on_host_membership_change():
996        """Invoked when host membership changes."""
997        everyone = AclGroup.objects.get(name='Everyone')
998
999        # find hosts that aren't in any ACL group and add them to Everyone
1000        # TODO(showard): this is a bit of a hack, since the fact that this query
1001        # works is kind of a coincidence of Django internals.  This trick
1002        # doesn't work in general (on all foreign key relationships).  I'll
1003        # replace it with a better technique when the need arises.
1004        orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True)
1005        everyone.hosts.add(*orphaned_hosts.distinct())
1006
1007        # find hosts in both Everyone and another ACL group, and remove them
1008        # from Everyone
1009        hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone')
1010        acled_hosts = set()
1011        for host in hosts_in_everyone:
1012            # Has an ACL group other than Everyone
1013            if host.aclgroup_set.count() > 1:
1014                acled_hosts.add(host)
1015        everyone.hosts.remove(*acled_hosts)
1016
1017
1018    def delete(self):
1019        if (self.name == 'Everyone'):
1020            raise AclAccessViolation("You cannot delete 'Everyone'!")
1021        self.check_for_acl_violation_acl_group()
1022        super(AclGroup, self).delete()
1023        self.on_host_membership_change()
1024
1025
1026    def add_current_user_if_empty(self):
1027        """Adds the current user if the set of users is empty."""
1028        if not self.users.count():
1029            self.users.add(User.current_user())
1030
1031
1032    def perform_after_save(self, change):
1033        """Called after a save.
1034
1035        @param change: Whether there was a change.
1036        """
1037        if not change:
1038            self.users.add(User.current_user())
1039        self.add_current_user_if_empty()
1040        self.on_host_membership_change()
1041
1042
1043    def save(self, *args, **kwargs):
1044        change = bool(self.id)
1045        if change:
1046            # Check the original object for an ACL violation
1047            AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group()
1048        super(AclGroup, self).save(*args, **kwargs)
1049        self.perform_after_save(change)
1050
1051
1052    class Meta:
1053        """Metadata for class AclGroup."""
1054        db_table = 'afe_acl_groups'
1055
1056    def __unicode__(self):
1057        return unicode(self.name)
1058
1059
1060class ParameterizedJob(dbmodels.Model):
1061    """
1062    Auxiliary configuration for a parameterized job.
1063
1064    This class is obsolete, and ought to be dead.  Due to a series of
1065    unfortunate events, it can't be deleted:
1066      * In `class Job` we're required to keep a reference to this class
1067        for the sake of the scheduler unit tests.
1068      * The existence of the reference in `Job` means that certain
1069        methods here will get called from the `get_jobs` RPC.
1070    So, the definitions below seem to be the minimum stub we can support
1071    unless/until we change the database schema.
1072    """
1073
1074    @classmethod
1075    def smart_get(cls, id_or_name, *args, **kwargs):
1076        """For compatibility with Job.add_object.
1077
1078        @param cls: Implicit class object.
1079        @param id_or_name: The ID or name to get.
1080        @param args: Non-keyword arguments.
1081        @param kwargs: Keyword arguments.
1082        """
1083        return cls.objects.get(pk=id_or_name)
1084
1085
1086    def job(self):
1087        """Returns the job if it exists, or else None."""
1088        jobs = self.job_set.all()
1089        assert jobs.count() <= 1
1090        return jobs and jobs[0] or None
1091
1092
1093    class Meta:
1094        """Metadata for class ParameterizedJob."""
1095        db_table = 'afe_parameterized_jobs'
1096
1097    def __unicode__(self):
1098        return u'%s (parameterized) - %s' % (self.test.name, self.job())
1099
1100
1101class JobManager(model_logic.ExtendedManager):
1102    'Custom manager to provide efficient status counts querying.'
1103    def get_status_counts(self, job_ids):
1104        """Returns a dict mapping the given job IDs to their status count dicts.
1105
1106        @param job_ids: A list of job IDs.
1107        """
1108        if not job_ids:
1109            return {}
1110        id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)
1111        cursor = connection.cursor()
1112        cursor.execute("""
1113            SELECT job_id, status, aborted, complete, COUNT(*)
1114            FROM afe_host_queue_entries
1115            WHERE job_id IN %s
1116            GROUP BY job_id, status, aborted, complete
1117            """ % id_list)
1118        all_job_counts = dict((job_id, {}) for job_id in job_ids)
1119        for job_id, status, aborted, complete, count in cursor.fetchall():
1120            job_dict = all_job_counts[job_id]
1121            full_status = HostQueueEntry.compute_full_status(status, aborted,
1122                                                             complete)
1123            job_dict.setdefault(full_status, 0)
1124            job_dict[full_status] += count
1125        return all_job_counts
1126
1127
1128class Job(dbmodels.Model, model_logic.ModelExtensions):
1129    """\
1130    owner: username of job owner
1131    name: job name (does not have to be unique)
1132    priority: Integer priority value.  Higher is more important.
1133    control_file: contents of control file
1134    control_type: Client or Server
1135    created_on: date of job creation
1136    submitted_on: date of job submission
1137    synch_count: how many hosts should be used per autoserv execution
1138    run_verify: Whether or not to run the verify phase
1139    run_reset: Whether or not to run the reset phase
1140    timeout: DEPRECATED - hours from queuing time until job times out
1141    timeout_mins: minutes from job queuing time until the job times out
1142    max_runtime_hrs: DEPRECATED - hours from job starting time until job
1143                     times out
1144    max_runtime_mins: minutes from job starting time until job times out
1145    email_list: list of people to email on completion delimited by any of:
1146                white space, ',', ':', ';'
1147    dependency_labels: many-to-many relationship with labels corresponding to
1148                       job dependencies
1149    reboot_before: Never, If dirty, or Always
1150    reboot_after: Never, If all tests passed, or Always
1151    parse_failed_repair: if True, a failed repair launched by this job will have
1152    its results parsed as part of the job.
1153    drone_set: The set of drones to run this job on
1154    parent_job: Parent job (optional)
1155    test_retry: Number of times to retry test if the test did not complete
1156                successfully. (optional, default: 0)
1157    require_ssp: Require server-side packaging unless require_ssp is set to
1158                 False. (optional, default: None)
1159    """
1160
1161    # TODO: Investigate, if jobkeyval_set is really needed.
1162    # dynamic_suite will write them into an attached file for the drone, but
1163    # it doesn't seem like they are actually used. If they aren't used, remove
1164    # jobkeyval_set here.
1165    SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels',
1166                                         'hostqueueentry_set',
1167                                         'jobkeyval_set',
1168                                         'shard'])
1169
1170    # SQL for selecting jobs that should be sent to shard.
1171    # We use raw sql as django filters were not optimized.
1172    # The following jobs are excluded by the SQL.
1173    #     - Non-aborted jobs known to shard as specified in |known_ids|.
1174    #       Note for jobs aborted on master, even if already known to shard,
1175    #       will be sent to shard again so that shard can abort them.
1176    #     - Completed jobs
1177    #     - Active jobs
1178    #     - Jobs without host_queue_entries
1179    NON_ABORTED_KNOWN_JOBS = '(t2.aborted = 0 AND t1.id IN (%(known_ids)s))'
1180
1181    SQL_SHARD_JOBS = (
1182        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
1183        'INNER JOIN afe_host_queue_entries t2  ON '
1184        '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
1185        '   %(check_known_jobs)s) '
1186        'LEFT OUTER JOIN afe_jobs_dependency_labels t3 ON (t1.id = t3.job_id) '
1187        'JOIN afe_shards_labels t4 '
1188        '  ON (t4.label_id = t3.label_id OR t4.label_id = t2.meta_host) '
1189        'WHERE t4.shard_id = %(shard_id)s'
1190        )
1191
1192    # Jobs can be created with assigned hosts and have no dependency
1193    # labels nor meta_host.
1194    # We are looking for:
1195    #     - a job whose hqe's meta_host is null
1196    #     - a job whose hqe has a host
1197    #     - one of the host's labels matches the shard's label.
1198    # Non-aborted known jobs, completed jobs, active jobs, jobs
1199    # without hqe are exluded as we do with SQL_SHARD_JOBS.
1200    SQL_SHARD_JOBS_WITH_HOSTS = (
1201        'SELECT DISTINCT(t1.id) FROM afe_jobs t1 '
1202        'INNER JOIN afe_host_queue_entries t2 ON '
1203        '  (t1.id = t2.job_id AND t2.complete != 1 AND t2.active != 1 '
1204        '   AND t2.meta_host IS NULL AND t2.host_id IS NOT NULL '
1205        '   %(check_known_jobs)s) '
1206        'LEFT OUTER JOIN afe_hosts_labels t3 ON (t2.host_id = t3.host_id) '
1207        'WHERE (t3.label_id IN '
1208        '  (SELECT label_id FROM afe_shards_labels '
1209        '   WHERE shard_id = %(shard_id)s))'
1210        )
1211
1212    # Even if we had filters about complete, active and aborted
1213    # bits in the above two SQLs, there is a chance that
1214    # the result may still contain a job with an hqe with 'complete=1'
1215    # or 'active=1' or 'aborted=0 and afe_job.id in known jobs.'
1216    # This happens when a job has two (or more) hqes and at least
1217    # one hqe has different bits than others.
1218    # We use a second sql to ensure we exclude all un-desired jobs.
1219    SQL_JOBS_TO_EXCLUDE =(
1220        'SELECT t1.id FROM afe_jobs t1 '
1221        'INNER JOIN afe_host_queue_entries t2 ON '
1222        '  (t1.id = t2.job_id) '
1223        'WHERE (t1.id in (%(candidates)s) '
1224        '  AND (t2.complete=1 OR t2.active=1 '
1225        '  %(check_known_jobs)s))'
1226        )
1227
1228    def _deserialize_relation(self, link, data):
1229        if link in ['hostqueueentry_set', 'jobkeyval_set']:
1230            for obj in data:
1231                obj['job_id'] = self.id
1232
1233        super(Job, self)._deserialize_relation(link, data)
1234
1235
1236    def custom_deserialize_relation(self, link, data):
1237        assert link == 'shard', 'Link %s should not be deserialized' % link
1238        self.shard = Shard.deserialize(data)
1239
1240
1241    def sanity_check_update_from_shard(self, shard, updated_serialized):
1242        # If the job got aborted on the master after the client fetched it
1243        # no shard_id will be set. The shard might still push updates though,
1244        # as the job might complete before the abort bit syncs to the shard.
1245        # Alternative considered: The master scheduler could be changed to not
1246        # set aborted jobs to completed that are sharded out. But that would
1247        # require database queries and seemed more complicated to implement.
1248        # This seems safe to do, as there won't be updates pushed from the wrong
1249        # shards should be powered off and wiped hen they are removed from the
1250        # master.
1251        if self.shard_id and self.shard_id != shard.id:
1252            raise error.UnallowedRecordsSentToMaster(
1253                'Job id=%s is assigned to shard (%s). Cannot update it with %s '
1254                'from shard %s.' % (self.id, self.shard_id, updated_serialized,
1255                                    shard.id))
1256
1257
1258    # TIMEOUT is deprecated.
1259    DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
1260        'AUTOTEST_WEB', 'job_timeout_default', default=24)
1261    DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value(
1262        'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60)
1263    # MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is
1264    # completed.
1265    DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(
1266        'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)
1267    DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value(
1268        'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60)
1269    DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
1270        'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,
1271        default=False)
1272
1273    owner = dbmodels.CharField(max_length=255)
1274    name = dbmodels.CharField(max_length=255)
1275    priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT)
1276    control_file = dbmodels.TextField(null=True, blank=True)
1277    control_type = dbmodels.SmallIntegerField(
1278        choices=control_data.CONTROL_TYPE.choices(),
1279        blank=True, # to allow 0
1280        default=control_data.CONTROL_TYPE.CLIENT)
1281    created_on = dbmodels.DateTimeField()
1282    synch_count = dbmodels.IntegerField(blank=True, default=0)
1283    timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)
1284    run_verify = dbmodels.BooleanField(default=False)
1285    email_list = dbmodels.CharField(max_length=250, blank=True)
1286    dependency_labels = (
1287            dbmodels.ManyToManyField(Label, blank=True,
1288                                     db_table='afe_jobs_dependency_labels'))
1289    reboot_before = dbmodels.SmallIntegerField(
1290        choices=model_attributes.RebootBefore.choices(), blank=True,
1291        default=DEFAULT_REBOOT_BEFORE)
1292    reboot_after = dbmodels.SmallIntegerField(
1293        choices=model_attributes.RebootAfter.choices(), blank=True,
1294        default=DEFAULT_REBOOT_AFTER)
1295    parse_failed_repair = dbmodels.BooleanField(
1296        default=DEFAULT_PARSE_FAILED_REPAIR)
1297    # max_runtime_hrs is deprecated. Will be removed after switch to mins is
1298    # completed.
1299    max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)
1300    max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS)
1301    drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
1302
1303    # TODO(jrbarnette)  We have to keep `parameterized_job` around or it
1304    # breaks the scheduler_models unit tests (and fixing the unit tests
1305    # will break the scheduler, so don't do that).
1306    #
1307    # The ultimate fix is to delete the column from the database table
1308    # at which point, you _must_ delete this.  Until you're ready to do
1309    # that, DON'T MUCK WITH IT.
1310    parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True,
1311                                            blank=True)
1312
1313    parent_job = dbmodels.ForeignKey('self', blank=True, null=True)
1314
1315    test_retry = dbmodels.IntegerField(blank=True, default=0)
1316
1317    run_reset = dbmodels.BooleanField(default=True)
1318
1319    timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
1320
1321    # If this is None on the master, a slave should be found.
1322    # If this is None on a slave, it should be synced back to the master
1323    shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
1324
1325    # If this is None, server-side packaging will be used for server side test,
1326    # unless it's disabled in global config AUTOSERV/enable_ssp_container.
1327    require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True)
1328
1329    # custom manager
1330    objects = JobManager()
1331
1332
1333    @decorators.cached_property
1334    def labels(self):
1335        """All the labels of this job"""
1336        # We need to convert dependency_labels to a list, because all() gives us
1337        # back an iterator, and storing/caching an iterator means we'd only be
1338        # able to read from it once.
1339        return list(self.dependency_labels.all())
1340
1341
1342    def is_server_job(self):
1343        """Returns whether this job is of type server."""
1344        return self.control_type == control_data.CONTROL_TYPE.SERVER
1345
1346
1347    @classmethod
1348    def create(cls, owner, options, hosts):
1349        """Creates a job.
1350
1351        The job is created by taking some information (the listed args) and
1352        filling in the rest of the necessary information.
1353
1354        @param cls: Implicit class object.
1355        @param owner: The owner for the job.
1356        @param options: An options object.
1357        @param hosts: The hosts to use.
1358        """
1359        AclGroup.check_for_acl_violation_hosts(hosts)
1360
1361        control_file = options.get('control_file')
1362
1363        user = User.current_user()
1364        if options.get('reboot_before') is None:
1365            options['reboot_before'] = user.get_reboot_before_display()
1366        if options.get('reboot_after') is None:
1367            options['reboot_after'] = user.get_reboot_after_display()
1368
1369        drone_set = DroneSet.resolve_name(options.get('drone_set'))
1370
1371        if options.get('timeout_mins') is None and options.get('timeout'):
1372            options['timeout_mins'] = options['timeout'] * 60
1373
1374        job = cls.add_object(
1375            owner=owner,
1376            name=options['name'],
1377            priority=options['priority'],
1378            control_file=control_file,
1379            control_type=options['control_type'],
1380            synch_count=options.get('synch_count'),
1381            # timeout needs to be deleted in the future.
1382            timeout=options.get('timeout'),
1383            timeout_mins=options.get('timeout_mins'),
1384            max_runtime_mins=options.get('max_runtime_mins'),
1385            run_verify=options.get('run_verify'),
1386            email_list=options.get('email_list'),
1387            reboot_before=options.get('reboot_before'),
1388            reboot_after=options.get('reboot_after'),
1389            parse_failed_repair=options.get('parse_failed_repair'),
1390            created_on=datetime.now(),
1391            drone_set=drone_set,
1392            parent_job=options.get('parent_job_id'),
1393            test_retry=options.get('test_retry'),
1394            run_reset=options.get('run_reset'),
1395            require_ssp=options.get('require_ssp'))
1396
1397        job.dependency_labels = options['dependencies']
1398
1399        if options.get('keyvals'):
1400            for key, value in options['keyvals'].iteritems():
1401                JobKeyval.objects.create(job=job, key=key, value=value)
1402
1403        return job
1404
1405
1406    @classmethod
1407    def assign_to_shard(cls, shard, known_ids):
1408        """Assigns unassigned jobs to a shard.
1409
1410        For all labels that have been assigned to this shard, all jobs that
1411        have this label, are assigned to this shard.
1412
1413        Jobs that are assigned to the shard but aren't already present on the
1414        shard are returned.
1415
1416        @param shard: The shard to assign jobs to.
1417        @param known_ids: List of all ids of incomplete jobs, the shard already
1418                          knows about.
1419                          This is used to figure out which jobs should be sent
1420                          to the shard. If shard_ids were used instead, jobs
1421                          would only be transferred once, even if the client
1422                          failed persisting them.
1423                          The number of unfinished jobs usually lies in O(1000).
1424                          Assuming one id takes 8 chars in the json, this means
1425                          overhead that lies in the lower kilobyte range.
1426                          A not in query with 5000 id's takes about 30ms.
1427
1428        @returns The job objects that should be sent to the shard.
1429        """
1430        # Disclaimer: Concurrent heartbeats should not occur in today's setup.
1431        # If this changes or they are triggered manually, this applies:
1432        # Jobs may be returned more than once by concurrent calls of this
1433        # function, as there is a race condition between SELECT and UPDATE.
1434        job_ids = set([])
1435        check_known_jobs_exclude = ''
1436        check_known_jobs_include = ''
1437
1438        if known_ids:
1439            check_known_jobs = (
1440                    cls.NON_ABORTED_KNOWN_JOBS %
1441                    {'known_ids': ','.join([str(i) for i in known_ids])})
1442            check_known_jobs_exclude = 'AND NOT ' + check_known_jobs
1443            check_known_jobs_include = 'OR ' + check_known_jobs
1444
1445        for sql in [cls.SQL_SHARD_JOBS, cls.SQL_SHARD_JOBS_WITH_HOSTS]:
1446            query = Job.objects.raw(sql % {
1447                    'check_known_jobs': check_known_jobs_exclude,
1448                    'shard_id': shard.id})
1449            job_ids |= set([j.id for j in query])
1450
1451        if job_ids:
1452            query = Job.objects.raw(
1453                    cls.SQL_JOBS_TO_EXCLUDE %
1454                    {'check_known_jobs': check_known_jobs_include,
1455                     'candidates': ','.join([str(i) for i in job_ids])})
1456            job_ids -= set([j.id for j in query])
1457
1458        if job_ids:
1459            Job.objects.filter(pk__in=job_ids).update(shard=shard)
1460            return list(Job.objects.filter(pk__in=job_ids).all())
1461        return []
1462
1463
1464    def queue(self, hosts, is_template=False):
1465        """Enqueue a job on the given hosts.
1466
1467        @param hosts: The hosts to use.
1468        @param is_template: Whether the status should be "Template".
1469        """
1470        if not hosts:
1471            # hostless job
1472            entry = HostQueueEntry.create(job=self, is_template=is_template)
1473            entry.save()
1474            return
1475
1476        for host in hosts:
1477            host.enqueue_job(self, is_template=is_template)
1478
1479
1480    def user(self):
1481        """Gets the user of this job, or None if it doesn't exist."""
1482        try:
1483            return User.objects.get(login=self.owner)
1484        except self.DoesNotExist:
1485            return None
1486
1487
1488    def abort(self):
1489        """Aborts this job."""
1490        for queue_entry in self.hostqueueentry_set.all():
1491            queue_entry.abort()
1492
1493
1494    def tag(self):
1495        """Returns a string tag for this job."""
1496        return server_utils.get_job_tag(self.id, self.owner)
1497
1498
1499    def keyval_dict(self):
1500        """Returns all keyvals for this job as a dictionary."""
1501        return dict((keyval.key, keyval.value)
1502                    for keyval in self.jobkeyval_set.all())
1503
1504
1505    @classmethod
1506    def get_attribute_model(cls):
1507        """Return the attribute model.
1508
1509        Override method in parent class. This class is called when
1510        deserializing the one-to-many relationship betwen Job and JobKeyval.
1511        On deserialization, we will try to clear any existing job keyvals
1512        associated with a job to avoid any inconsistency.
1513        Though Job doesn't implement ModelWithAttribute, we still treat
1514        it as an attribute model for this purpose.
1515
1516        @returns: The attribute model of Job.
1517        """
1518        return JobKeyval
1519
1520
1521    class Meta:
1522        """Metadata for class Job."""
1523        db_table = 'afe_jobs'
1524
1525    def __unicode__(self):
1526        return u'%s (%s-%s)' % (self.name, self.id, self.owner)
1527
1528
1529class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
1530    """Keyvals associated with jobs"""
1531
1532    SERIALIZATION_LINKS_TO_KEEP = set(['job'])
1533    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
1534
1535    job = dbmodels.ForeignKey(Job)
1536    key = dbmodels.CharField(max_length=90)
1537    value = dbmodels.CharField(max_length=300)
1538
1539    objects = model_logic.ExtendedManager()
1540
1541
1542    @classmethod
1543    def get_record(cls, data):
1544        """Check the database for an identical record.
1545
1546        Use job_id and key to search for a existing record.
1547
1548        @raises: DoesNotExist, if no record found
1549        @raises: MultipleObjectsReturned if multiple records found.
1550        """
1551        # TODO(fdeng): We should use job_id and key together as
1552        #              a primary key in the db.
1553        return cls.objects.get(job_id=data['job_id'], key=data['key'])
1554
1555
1556    @classmethod
1557    def deserialize(cls, data):
1558        """Override deserialize in parent class.
1559
1560        Do not deserialize id as id is not kept consistent on master and shards.
1561
1562        @param data: A dictionary of data to deserialize.
1563
1564        @returns: A JobKeyval object.
1565        """
1566        if data:
1567            data.pop('id')
1568        return super(JobKeyval, cls).deserialize(data)
1569
1570
1571    class Meta:
1572        """Metadata for class JobKeyval."""
1573        db_table = 'afe_job_keyvals'
1574
1575
1576class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
1577    """Represents an ineligible host queue."""
1578    job = dbmodels.ForeignKey(Job)
1579    host = dbmodels.ForeignKey(Host)
1580
1581    objects = model_logic.ExtendedManager()
1582
1583    class Meta:
1584        """Metadata for class IneligibleHostQueue."""
1585        db_table = 'afe_ineligible_host_queues'
1586
1587
1588class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
1589    """Represents a host queue entry."""
1590
1591    SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host'])
1592    SERIALIZATION_LINKS_TO_KEEP = set(['host'])
1593    SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted'])
1594
1595
1596    def custom_deserialize_relation(self, link, data):
1597        assert link == 'meta_host'
1598        self.meta_host = Label.deserialize(data)
1599
1600
1601    def sanity_check_update_from_shard(self, shard, updated_serialized,
1602                                       job_ids_sent):
1603        if self.job_id not in job_ids_sent:
1604            raise error.UnallowedRecordsSentToMaster(
1605                'Sent HostQueueEntry without corresponding '
1606                'job entry: %s' % updated_serialized)
1607
1608
1609    Status = host_queue_entry_states.Status
1610    ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
1611    COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
1612    PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES
1613    IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES
1614
1615    job = dbmodels.ForeignKey(Job)
1616    host = dbmodels.ForeignKey(Host, blank=True, null=True)
1617    status = dbmodels.CharField(max_length=255)
1618    meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,
1619                                    db_column='meta_host')
1620    active = dbmodels.BooleanField(default=False)
1621    complete = dbmodels.BooleanField(default=False)
1622    deleted = dbmodels.BooleanField(default=False)
1623    execution_subdir = dbmodels.CharField(max_length=255, blank=True,
1624                                          default='')
1625    # If atomic_group is set, this is a virtual HostQueueEntry that will
1626    # be expanded into many actual hosts within the group at schedule time.
1627    atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
1628    aborted = dbmodels.BooleanField(default=False)
1629    started_on = dbmodels.DateTimeField(null=True, blank=True)
1630    finished_on = dbmodels.DateTimeField(null=True, blank=True)
1631
1632    objects = model_logic.ExtendedManager()
1633
1634
1635    def __init__(self, *args, **kwargs):
1636        super(HostQueueEntry, self).__init__(*args, **kwargs)
1637        self._record_attributes(['status'])
1638
1639
1640    @classmethod
1641    def create(cls, job, host=None, meta_host=None,
1642                 is_template=False):
1643        """Creates a new host queue entry.
1644
1645        @param cls: Implicit class object.
1646        @param job: The associated job.
1647        @param host: The associated host.
1648        @param meta_host: The associated meta host.
1649        @param is_template: Whether the status should be "Template".
1650        """
1651        if is_template:
1652            status = cls.Status.TEMPLATE
1653        else:
1654            status = cls.Status.QUEUED
1655
1656        return cls(job=job, host=host, meta_host=meta_host, status=status)
1657
1658
1659    def save(self, *args, **kwargs):
1660        self._set_active_and_complete()
1661        super(HostQueueEntry, self).save(*args, **kwargs)
1662        self._check_for_updated_attributes()
1663
1664
1665    def execution_path(self):
1666        """
1667        Path to this entry's results (relative to the base results directory).
1668        """
1669        return server_utils.get_hqe_exec_path(self.job.tag(),
1670                                              self.execution_subdir)
1671
1672
1673    def host_or_metahost_name(self):
1674        """Returns the first non-None name found in priority order.
1675
1676        The priority order checked is: (1) host name; (2) meta host name
1677        """
1678        if self.host:
1679            return self.host.hostname
1680        else:
1681            assert self.meta_host
1682            return self.meta_host.name
1683
1684
1685    def _set_active_and_complete(self):
1686        if self.status in self.ACTIVE_STATUSES:
1687            self.active, self.complete = True, False
1688        elif self.status in self.COMPLETE_STATUSES:
1689            self.active, self.complete = False, True
1690        else:
1691            self.active, self.complete = False, False
1692
1693
1694    def on_attribute_changed(self, attribute, old_value):
1695        assert attribute == 'status'
1696        logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id,
1697                     self.status)
1698
1699
1700    def is_meta_host_entry(self):
1701        'True if this is a entry has a meta_host instead of a host.'
1702        return self.host is None and self.meta_host is not None
1703
1704
1705    # This code is shared between rpc_interface and models.HostQueueEntry.
1706    # Sadly due to circular imports between the 2 (crbug.com/230100) making it
1707    # a class method was the best way to refactor it. Attempting to put it in
1708    # rpc_utils or a new utils module failed as that would require us to import
1709    # models.py but to call it from here we would have to import the utils.py
1710    # thus creating a cycle.
1711    @classmethod
1712    def abort_host_queue_entries(cls, host_queue_entries):
1713        """Aborts a collection of host_queue_entries.
1714
1715        Abort these host queue entry and all host queue entries of jobs created
1716        by them.
1717
1718        @param host_queue_entries: List of host queue entries we want to abort.
1719        """
1720        # This isn't completely immune to race conditions since it's not atomic,
1721        # but it should be safe given the scheduler's behavior.
1722
1723        # TODO(milleral): crbug.com/230100
1724        # The |abort_host_queue_entries| rpc does nearly exactly this,
1725        # however, trying to re-use the code generates some horrible
1726        # circular import error.  I'd be nice to refactor things around
1727        # sometime so the code could be reused.
1728
1729        # Fixpoint algorithm to find the whole tree of HQEs to abort to
1730        # minimize the total number of database queries:
1731        children = set()
1732        new_children = set(host_queue_entries)
1733        while new_children:
1734            children.update(new_children)
1735            new_child_ids = [hqe.job_id for hqe in new_children]
1736            new_children = HostQueueEntry.objects.filter(
1737                    job__parent_job__in=new_child_ids,
1738                    complete=False, aborted=False).all()
1739            # To handle circular parental relationships
1740            new_children = set(new_children) - children
1741
1742        # Associate a user with the host queue entries that we're about
1743        # to abort so that we can look up who to blame for the aborts.
1744        now = datetime.now()
1745        user = User.current_user()
1746        aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe,
1747                aborted_by=user, aborted_on=now) for hqe in children]
1748        AbortedHostQueueEntry.objects.bulk_create(aborted_hqes)
1749        # Bulk update all of the HQEs to set the abort bit.
1750        child_ids = [hqe.id for hqe in children]
1751        HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True)
1752
1753
1754    def abort(self):
1755        """ Aborts this host queue entry.
1756
1757        Abort this host queue entry and all host queue entries of jobs created by
1758        this one.
1759
1760        """
1761        if not self.complete and not self.aborted:
1762            HostQueueEntry.abort_host_queue_entries([self])
1763
1764
1765    @classmethod
1766    def compute_full_status(cls, status, aborted, complete):
1767        """Returns a modified status msg if the host queue entry was aborted.
1768
1769        @param cls: Implicit class object.
1770        @param status: The original status message.
1771        @param aborted: Whether the host queue entry was aborted.
1772        @param complete: Whether the host queue entry was completed.
1773        """
1774        if aborted and not complete:
1775            return 'Aborted (%s)' % status
1776        return status
1777
1778
1779    def full_status(self):
1780        """Returns the full status of this host queue entry, as a string."""
1781        return self.compute_full_status(self.status, self.aborted,
1782                                        self.complete)
1783
1784
1785    def _postprocess_object_dict(self, object_dict):
1786        object_dict['full_status'] = self.full_status()
1787
1788
1789    class Meta:
1790        """Metadata for class HostQueueEntry."""
1791        db_table = 'afe_host_queue_entries'
1792
1793
1794
1795    def __unicode__(self):
1796        hostname = None
1797        if self.host:
1798            hostname = self.host.hostname
1799        return u"%s/%d (%d)" % (hostname, self.job.id, self.id)
1800
1801
1802class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
1803    """Represents an aborted host queue entry."""
1804    queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True)
1805    aborted_by = dbmodels.ForeignKey(User)
1806    aborted_on = dbmodels.DateTimeField()
1807
1808    objects = model_logic.ExtendedManager()
1809
1810
1811    def save(self, *args, **kwargs):
1812        self.aborted_on = datetime.now()
1813        super(AbortedHostQueueEntry, self).save(*args, **kwargs)
1814
1815    class Meta:
1816        """Metadata for class AbortedHostQueueEntry."""
1817        db_table = 'afe_aborted_host_queue_entries'
1818
1819
1820class SpecialTask(dbmodels.Model, model_logic.ModelExtensions):
1821    """\
1822    Tasks to run on hosts at the next time they are in the Ready state. Use this
1823    for high-priority tasks, such as forced repair or forced reinstall.
1824
1825    host: host to run this task on
1826    task: special task to run
1827    time_requested: date and time the request for this task was made
1828    is_active: task is currently running
1829    is_complete: task has finished running
1830    is_aborted: task was aborted
1831    time_started: date and time the task started
1832    time_finished: date and time the task finished
1833    queue_entry: Host queue entry waiting on this task (or None, if task was not
1834                 started in preparation of a job)
1835    """
1836    Task = enum.Enum('Verify', 'Cleanup', 'Repair', 'Reset', 'Provision',
1837                     string_values=True)
1838
1839    host = dbmodels.ForeignKey(Host, blank=False, null=False)
1840    task = dbmodels.CharField(max_length=64, choices=Task.choices(),
1841                              blank=False, null=False)
1842    requested_by = dbmodels.ForeignKey(User)
1843    time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False,
1844                                            null=False)
1845    is_active = dbmodels.BooleanField(default=False, blank=False, null=False)
1846    is_complete = dbmodels.BooleanField(default=False, blank=False, null=False)
1847    is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False)
1848    time_started = dbmodels.DateTimeField(null=True, blank=True)
1849    queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True)
1850    success = dbmodels.BooleanField(default=False, blank=False, null=False)
1851    time_finished = dbmodels.DateTimeField(null=True, blank=True)
1852
1853    objects = model_logic.ExtendedManager()
1854
1855
1856    def save(self, **kwargs):
1857        if self.queue_entry:
1858            self.requested_by = User.objects.get(
1859                    login=self.queue_entry.job.owner)
1860        super(SpecialTask, self).save(**kwargs)
1861
1862
1863    def execution_path(self):
1864        """Returns the execution path for a special task."""
1865        return server_utils.get_special_task_exec_path(
1866                self.host.hostname, self.id, self.task, self.time_requested)
1867
1868
1869    # property to emulate HostQueueEntry.status
1870    @property
1871    def status(self):
1872        """Returns a host queue entry status appropriate for a speical task."""
1873        return server_utils.get_special_task_status(
1874                self.is_complete, self.success, self.is_active)
1875
1876
1877    # property to emulate HostQueueEntry.started_on
1878    @property
1879    def started_on(self):
1880        """Returns the time at which this special task started."""
1881        return self.time_started
1882
1883
1884    @classmethod
1885    def schedule_special_task(cls, host, task):
1886        """Schedules a special task on a host if not already scheduled.
1887
1888        @param cls: Implicit class object.
1889        @param host: The host to use.
1890        @param task: The task to schedule.
1891        """
1892        existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task,
1893                                                    is_active=False,
1894                                                    is_complete=False)
1895        if existing_tasks:
1896            return existing_tasks[0]
1897
1898        special_task = SpecialTask(host=host, task=task,
1899                                   requested_by=User.current_user())
1900        special_task.save()
1901        return special_task
1902
1903
1904    def abort(self):
1905        """ Abort this special task."""
1906        self.is_aborted = True
1907        self.save()
1908
1909
1910    def activate(self):
1911        """
1912        Sets a task as active and sets the time started to the current time.
1913        """
1914        logging.info('Starting: %s', self)
1915        self.is_active = True
1916        self.time_started = datetime.now()
1917        self.save()
1918
1919
1920    def finish(self, success):
1921        """Sets a task as completed.
1922
1923        @param success: Whether or not the task was successful.
1924        """
1925        logging.info('Finished: %s', self)
1926        self.is_active = False
1927        self.is_complete = True
1928        self.success = success
1929        if self.time_started:
1930            self.time_finished = datetime.now()
1931        self.save()
1932
1933
1934    class Meta:
1935        """Metadata for class SpecialTask."""
1936        db_table = 'afe_special_tasks'
1937
1938
1939    def __unicode__(self):
1940        result = u'Special Task %s (host %s, task %s, time %s)' % (
1941            self.id, self.host, self.task, self.time_requested)
1942        if self.is_complete:
1943            result += u' (completed)'
1944        elif self.is_active:
1945            result += u' (active)'
1946
1947        return result
1948
1949
1950class StableVersion(dbmodels.Model, model_logic.ModelExtensions):
1951
1952    board = dbmodels.CharField(max_length=255, unique=True)
1953    version = dbmodels.CharField(max_length=255)
1954
1955    class Meta:
1956        """Metadata for class StableVersion."""
1957        db_table = 'afe_stable_versions'
1958