1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5#pylint: disable-msg=C0111 6 7import os 8import logging 9import time 10 11from autotest_lib.client.common_lib import global_config 12from autotest_lib.client.common_lib.cros.graphite import autotest_stats 13from autotest_lib.frontend.afe import models 14from autotest_lib.scheduler import email_manager 15from autotest_lib.scheduler import scheduler_config, scheduler_models 16 17 18# Override default parser with our site parser. 19def parser_path(install_dir): 20 """Return site implementation of parser. 21 22 @param install_dir: installation directory. 23 """ 24 return os.path.join(install_dir, 'tko', 'site_parse') 25 26 27class SiteAgentTask(object): 28 """ 29 SiteAgentTask subclasses BaseAgentTask in monitor_db. 30 """ 31 32 33 def _archive_results(self, queue_entries): 34 """ 35 Set the status of queue_entries to ARCHIVING. 36 37 This method sets the status of the queue_entries to ARCHIVING 38 if the enable_archiving flag is true in global_config.ini. 39 Otherwise, it bypasses the archiving step and sets the queue entries 40 to the final status of current step. 41 """ 42 enable_archiving = global_config.global_config.get_config_value( 43 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) 44 # Set the status of the queue entries to archiving or self final status 45 if enable_archiving: 46 status = models.HostQueueEntry.Status.ARCHIVING 47 else: 48 status = self._final_status() 49 50 for queue_entry in self.queue_entries: 51 queue_entry.set_status(status) 52 53 54 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 55 allowed_host_statuses=None): 56 """ 57 Forked from monitor_db.py 58 """ 59 class_name = self.__class__.__name__ 60 for entry in queue_entries: 61 if entry.status not in allowed_hqe_statuses: 62 # In the orignal code, here we raise an exception. In an 63 # effort to prevent downtime we will instead abort the job and 64 # send out an email notifying us this has occured. 65 error_message = ('%s attempting to start entry with invalid ' 66 'status %s: %s. Aborting Job: %s.' 67 % (class_name, entry.status, entry, 68 entry.job)) 69 logging.error(error_message) 70 email_manager.manager.enqueue_notify_email( 71 'Job Aborted - Invalid Host Queue Entry Status', 72 error_message) 73 entry.job.request_abort() 74 invalid_host_status = ( 75 allowed_host_statuses is not None 76 and entry.host.status not in allowed_host_statuses) 77 if invalid_host_status: 78 # In the orignal code, here we raise an exception. In an 79 # effort to prevent downtime we will instead abort the job and 80 # send out an email notifying us this has occured. 81 error_message = ('%s attempting to start on queue entry with ' 82 'invalid host status %s: %s. Aborting Job: %s' 83 % (class_name, entry.host.status, entry, 84 entry.job)) 85 logging.error(error_message) 86 email_manager.manager.enqueue_notify_email( 87 'Job Aborted - Invalid Host Status', error_message) 88 entry.job.request_abort() 89 90 91class SiteDispatcher(object): 92 """ 93 SiteDispatcher subclasses BaseDispatcher in monitor_db. 94 """ 95 DEFAULT_REQUESTED_BY_USER_ID = 1 96 97 98 _timer = autotest_stats.Timer('scheduler') 99 _gauge = autotest_stats.Gauge('scheduler_rel') 100 _tick_start = None 101 102 103 @_timer.decorate 104 def tick(self): 105 self._tick_start = time.time() 106 super(SiteDispatcher, self).tick() 107 self._gauge.send('tick', time.time() - self._tick_start) 108 109 @_timer.decorate 110 def _garbage_collection(self): 111 super(SiteDispatcher, self)._garbage_collection() 112 if self._tick_start: 113 self._gauge.send('_garbage_collection', 114 time.time() - self._tick_start) 115 116 @_timer.decorate 117 def _run_cleanup(self): 118 super(SiteDispatcher, self)._run_cleanup() 119 if self._tick_start: 120 self._gauge.send('_run_cleanup', time.time() - self._tick_start) 121 122 @_timer.decorate 123 def _find_aborting(self): 124 super(SiteDispatcher, self)._find_aborting() 125 if self._tick_start: 126 self._gauge.send('_find_aborting', time.time() - self._tick_start) 127 128 @_timer.decorate 129 def _process_recurring_runs(self): 130 super(SiteDispatcher, self)._process_recurring_runs() 131 if self._tick_start: 132 self._gauge.send('_process_recurring_runs', 133 time.time() - self._tick_start) 134 135 @_timer.decorate 136 def _schedule_delay_tasks(self): 137 super(SiteDispatcher, self)._schedule_delay_tasks() 138 if self._tick_start: 139 self._gauge.send('_schedule_delay_tasks', 140 time.time() - self._tick_start) 141 142 @_timer.decorate 143 def _schedule_running_host_queue_entries(self): 144 super(SiteDispatcher, self)._schedule_running_host_queue_entries() 145 if self._tick_start: 146 self._gauge.send('_schedule_running_host_queue_entries', 147 time.time() - self._tick_start) 148 149 @_timer.decorate 150 def _schedule_special_tasks(self): 151 super(SiteDispatcher, self)._schedule_special_tasks() 152 if self._tick_start: 153 self._gauge.send('_schedule_special_tasks', 154 time.time() - self._tick_start) 155 156 @_timer.decorate 157 def _schedule_new_jobs(self): 158 super(SiteDispatcher, self)._schedule_new_jobs() 159 if self._tick_start: 160 self._gauge.send('_schedule_new_jobs', 161 time.time() - self._tick_start) 162 163 164 @_timer.decorate 165 def _handle_agents(self): 166 super(SiteDispatcher, self)._handle_agents() 167 if self._tick_start: 168 self._gauge.send('_handle_agents', time.time() - self._tick_start) 169 170 171 def _reverify_hosts_where(self, where, 172 print_message='Reverifying host %s'): 173 """ 174 This is an altered version of _reverify_hosts_where the class to 175 models.SpecialTask.objects.create passes in an argument for 176 requested_by, in order to allow the Reset task to be created 177 properly. 178 """ 179 full_where='locked = 0 AND invalid = 0 AND ' + where 180 for host in scheduler_models.Host.fetch(where=full_where): 181 if self.host_has_agent(host): 182 # host has already been recovered in some way 183 continue 184 if self._host_has_scheduled_special_task(host): 185 # host will have a special task scheduled on the next cycle 186 continue 187 if print_message: 188 logging.error(print_message, host.hostname) 189 try: 190 user = models.User.objects.get(login='autotest_system') 191 except models.User.DoesNotExist: 192 user = models.User.objects.get( 193 id=SiteDispatcher.DEFAULT_REQUESTED_BY_USER_ID) 194 models.SpecialTask.objects.create( 195 task=models.SpecialTask.Task.RESET, 196 host=models.Host.objects.get(id=host.id), 197 requested_by=user) 198 199 200 def _check_for_unrecovered_verifying_entries(self): 201 # Verify is replaced by Reset. 202 queue_entries = scheduler_models.HostQueueEntry.fetch( 203 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING) 204 for queue_entry in queue_entries: 205 special_tasks = models.SpecialTask.objects.filter( 206 task__in=(models.SpecialTask.Task.CLEANUP, 207 models.SpecialTask.Task.VERIFY, 208 models.SpecialTask.Task.RESET), 209 queue_entry__id=queue_entry.id, 210 is_complete=False) 211 if special_tasks.count() == 0: 212 logging.error('Unrecovered Resetting host queue entry: %s. ' 213 'Setting status to Queued.', str(queue_entry)) 214 # Essentially this host queue entry was set to be Verifying 215 # however no special task exists for entry. This occurs if the 216 # scheduler dies between changing the status and creating the 217 # special task. By setting it to queued, the job can restart 218 # from the beginning and proceed correctly. This is much more 219 # preferable than having monitor_db not launching. 220 queue_entry.set_status('Queued') 221