1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5#pylint: disable-msg=C0111
6
7import os
8import logging
9import time
10
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib.cros.graphite import autotest_stats
13from autotest_lib.frontend.afe import models
14from autotest_lib.scheduler import email_manager
15from autotest_lib.scheduler import scheduler_config, scheduler_models
16
17
18# Override default parser with our site parser.
19def parser_path(install_dir):
20    """Return site implementation of parser.
21
22    @param install_dir: installation directory.
23    """
24    return os.path.join(install_dir, 'tko', 'site_parse')
25
26
27class SiteAgentTask(object):
28    """
29    SiteAgentTask subclasses BaseAgentTask in monitor_db.
30    """
31
32
33    def _archive_results(self, queue_entries):
34        """
35        Set the status of queue_entries to ARCHIVING.
36
37        This method sets the status of the queue_entries to ARCHIVING
38        if the enable_archiving flag is true in global_config.ini.
39        Otherwise, it bypasses the archiving step and sets the queue entries
40        to the final status of current step.
41        """
42        enable_archiving = global_config.global_config.get_config_value(
43            scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
44        # Set the status of the queue entries to archiving or self final status
45        if enable_archiving:
46            status = models.HostQueueEntry.Status.ARCHIVING
47        else:
48            status = self._final_status()
49
50        for queue_entry in self.queue_entries:
51            queue_entry.set_status(status)
52
53
54    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
55                                    allowed_host_statuses=None):
56        """
57        Forked from monitor_db.py
58        """
59        class_name = self.__class__.__name__
60        for entry in queue_entries:
61            if entry.status not in allowed_hqe_statuses:
62                # In the orignal code, here we raise an exception. In an
63                # effort to prevent downtime we will instead abort the job and
64                # send out an email notifying us this has occured.
65                error_message = ('%s attempting to start entry with invalid '
66                                 'status %s: %s. Aborting Job: %s.'
67                                 % (class_name, entry.status, entry,
68                                    entry.job))
69                logging.error(error_message)
70                email_manager.manager.enqueue_notify_email(
71                    'Job Aborted - Invalid Host Queue Entry Status',
72                    error_message)
73                entry.job.request_abort()
74            invalid_host_status = (
75                    allowed_host_statuses is not None
76                    and entry.host.status not in allowed_host_statuses)
77            if invalid_host_status:
78                # In the orignal code, here we raise an exception. In an
79                # effort to prevent downtime we will instead abort the job and
80                # send out an email notifying us this has occured.
81                error_message = ('%s attempting to start on queue entry with '
82                                 'invalid host status %s: %s. Aborting Job: %s'
83                                 % (class_name, entry.host.status, entry,
84                                    entry.job))
85                logging.error(error_message)
86                email_manager.manager.enqueue_notify_email(
87                    'Job Aborted - Invalid Host Status', error_message)
88                entry.job.request_abort()
89
90
91class SiteDispatcher(object):
92    """
93    SiteDispatcher subclasses BaseDispatcher in monitor_db.
94    """
95    DEFAULT_REQUESTED_BY_USER_ID = 1
96
97
98    _timer = autotest_stats.Timer('scheduler')
99    _gauge = autotest_stats.Gauge('scheduler_rel')
100    _tick_start = None
101
102
103    @_timer.decorate
104    def tick(self):
105        self._tick_start = time.time()
106        super(SiteDispatcher, self).tick()
107        self._gauge.send('tick', time.time() - self._tick_start)
108
109    @_timer.decorate
110    def _garbage_collection(self):
111        super(SiteDispatcher, self)._garbage_collection()
112        if self._tick_start:
113            self._gauge.send('_garbage_collection',
114                             time.time() - self._tick_start)
115
116    @_timer.decorate
117    def _run_cleanup(self):
118        super(SiteDispatcher, self)._run_cleanup()
119        if self._tick_start:
120            self._gauge.send('_run_cleanup', time.time() - self._tick_start)
121
122    @_timer.decorate
123    def _find_aborting(self):
124        super(SiteDispatcher, self)._find_aborting()
125        if self._tick_start:
126            self._gauge.send('_find_aborting', time.time() - self._tick_start)
127
128    @_timer.decorate
129    def _process_recurring_runs(self):
130        super(SiteDispatcher, self)._process_recurring_runs()
131        if self._tick_start:
132            self._gauge.send('_process_recurring_runs',
133                             time.time() - self._tick_start)
134
135    @_timer.decorate
136    def _schedule_delay_tasks(self):
137        super(SiteDispatcher, self)._schedule_delay_tasks()
138        if self._tick_start:
139            self._gauge.send('_schedule_delay_tasks',
140                             time.time() - self._tick_start)
141
142    @_timer.decorate
143    def _schedule_running_host_queue_entries(self):
144        super(SiteDispatcher, self)._schedule_running_host_queue_entries()
145        if self._tick_start:
146            self._gauge.send('_schedule_running_host_queue_entries',
147                             time.time() - self._tick_start)
148
149    @_timer.decorate
150    def _schedule_special_tasks(self):
151        super(SiteDispatcher, self)._schedule_special_tasks()
152        if self._tick_start:
153            self._gauge.send('_schedule_special_tasks',
154                             time.time() - self._tick_start)
155
156    @_timer.decorate
157    def _schedule_new_jobs(self):
158        super(SiteDispatcher, self)._schedule_new_jobs()
159        if self._tick_start:
160            self._gauge.send('_schedule_new_jobs',
161                             time.time() - self._tick_start)
162
163
164    @_timer.decorate
165    def _handle_agents(self):
166        super(SiteDispatcher, self)._handle_agents()
167        if self._tick_start:
168            self._gauge.send('_handle_agents', time.time() - self._tick_start)
169
170
171    def _reverify_hosts_where(self, where,
172                              print_message='Reverifying host %s'):
173        """
174        This is an altered version of _reverify_hosts_where the class to
175        models.SpecialTask.objects.create passes in an argument for
176        requested_by, in order to allow the Reset task to be created
177        properly.
178        """
179        full_where='locked = 0 AND invalid = 0 AND ' + where
180        for host in scheduler_models.Host.fetch(where=full_where):
181            if self.host_has_agent(host):
182                # host has already been recovered in some way
183                continue
184            if self._host_has_scheduled_special_task(host):
185                # host will have a special task scheduled on the next cycle
186                continue
187            if print_message:
188                logging.error(print_message, host.hostname)
189            try:
190                user = models.User.objects.get(login='autotest_system')
191            except models.User.DoesNotExist:
192                user = models.User.objects.get(
193                        id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID)
194            models.SpecialTask.objects.create(
195                    task=models.SpecialTask.Task.RESET,
196                    host=models.Host.objects.get(id=host.id),
197                    requested_by=user)
198
199
200    def _check_for_unrecovered_verifying_entries(self):
201        # Verify is replaced by Reset.
202        queue_entries = scheduler_models.HostQueueEntry.fetch(
203                where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)
204        for queue_entry in queue_entries:
205            special_tasks = models.SpecialTask.objects.filter(
206                    task__in=(models.SpecialTask.Task.CLEANUP,
207                              models.SpecialTask.Task.VERIFY,
208                              models.SpecialTask.Task.RESET),
209                    queue_entry__id=queue_entry.id,
210                    is_complete=False)
211            if special_tasks.count() == 0:
212                logging.error('Unrecovered Resetting host queue entry: %s. '
213                              'Setting status to Queued.', str(queue_entry))
214                # Essentially this host queue entry was set to be Verifying
215                # however no special task exists for entry. This occurs if the
216                # scheduler dies between changing the status and creating the
217                # special task. By setting it to queued, the job can restart
218                # from the beginning and proceed correctly. This is much more
219                # preferable than having monitor_db not launching.
220                queue_entry.set_status('Queued')
221