monitor_db_cleanup.py revision 6157c63947d2d628d187a084acb0a48473af1c79
1""" 2Autotest AFE Cleanup used by the scheduler 3""" 4 5 6import datetime, time, logging 7import common 8from autotest_lib.database import database_connection 9from autotest_lib.frontend.afe import models 10from autotest_lib.scheduler import email_manager, scheduler_config 11 12 13class PeriodicCleanup(object): 14 15 16 def __init__(self, db, clean_interval, run_at_initialize=False): 17 self._db = db 18 self.clean_interval = clean_interval 19 self._last_clean_time = time.time() 20 self._run_at_initialize = run_at_initialize 21 22 23 def initialize(self): 24 if self._run_at_initialize: 25 self._cleanup() 26 27 28 def run_cleanup_maybe(self): 29 should_cleanup = (self._last_clean_time + self.clean_interval * 60 30 < time.time()) 31 if should_cleanup: 32 self._cleanup() 33 self._last_clean_time = time.time() 34 35 36 def _cleanup(self): 37 """Abrstract cleanup method.""" 38 raise NotImplementedError 39 40 41class UserCleanup(PeriodicCleanup): 42 """User cleanup that is controlled by the global config variable 43 clean_interval in the SCHEDULER section. 44 """ 45 46 47 def __init__(self, db, clean_interval_minutes): 48 super(UserCleanup, self).__init__(db, clean_interval_minutes) 49 50 51 def _cleanup(self): 52 logging.info('Running periodic cleanup') 53 self._abort_timed_out_jobs() 54 self._abort_jobs_past_synch_start_timeout() 55 self._abort_jobs_past_max_runtime() 56 self._clear_inactive_blocks() 57 self._check_for_db_inconsistencies() 58 59 60 def _abort_timed_out_jobs(self): 61 msg = 'Aborting all jobs that have timed out and are not complete' 62 logging.info(msg) 63 query = models.Job.objects.filter(hostqueueentry__complete=False).extra( 64 where=['created_on + INTERVAL timeout HOUR < NOW()']) 65 for job in query.distinct(): 66 logging.warning('Aborting job %d due to job timeout', job.id) 67 job.abort(None) 68 69 70 def _abort_jobs_past_synch_start_timeout(self): 71 """ 72 Abort synchronous jobs that are past the start timeout (from global 73 config) and are holding a machine that's in everyone. 74 """ 75 msg = 'Aborting synchronous jobs that are past the start timeout' 76 logging.info(msg) 77 timeout_delta = datetime.timedelta( 78 minutes=scheduler_config.config.synch_job_start_timeout_minutes) 79 timeout_start = datetime.datetime.now() - timeout_delta 80 query = models.Job.objects.filter( 81 created_on__lt=timeout_start, 82 hostqueueentry__status='Pending', 83 hostqueueentry__host__aclgroup__name='Everyone') 84 for job in query.distinct(): 85 logging.warning('Aborting job %d due to start timeout', job.id) 86 entries_to_abort = job.hostqueueentry_set.exclude( 87 status=models.HostQueueEntry.Status.RUNNING) 88 for queue_entry in entries_to_abort: 89 queue_entry.abort(None) 90 91 92 def _abort_jobs_past_max_runtime(self): 93 """ 94 Abort executions that have started and are past the job's max runtime. 95 """ 96 logging.info('Aborting all jobs that have passed maximum runtime') 97 rows = self._db.execute(""" 98 SELECT hqe.id 99 FROM host_queue_entries AS hqe 100 INNER JOIN jobs ON (hqe.job_id = jobs.id) 101 WHERE NOT hqe.complete AND NOT hqe.aborted AND 102 hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""") 103 query = models.HostQueueEntry.objects.filter( 104 id__in=[row[0] for row in rows]) 105 for queue_entry in query.distinct(): 106 logging.warning('Aborting entry %s due to max runtime', queue_entry) 107 queue_entry.abort(None) 108 109 110 def _check_for_db_inconsistencies(self): 111 logging.info('Cleaning db inconsistencies') 112 self._check_all_invalid_related_objects() 113 114 115 def _check_invalid_related_objects_one_way(self, first_model, 116 relation_field, second_model): 117 if 'invalid' not in first_model.get_field_dict(): 118 return [] 119 invalid_objects = list(first_model.objects.filter(invalid=True)) 120 first_model.objects.populate_relationships(invalid_objects, 121 second_model, 122 'related_objects') 123 error_lines = [] 124 for invalid_object in invalid_objects: 125 if invalid_object.related_objects: 126 related_list = ', '.join(str(related_object) for related_object 127 in invalid_object.related_objects) 128 error_lines.append('Invalid %s %s is related to %ss: %s' 129 % (first_model.__name__, invalid_object, 130 second_model.__name__, related_list)) 131 related_manager = getattr(invalid_object, relation_field) 132 related_manager.clear() 133 return error_lines 134 135 136 def _check_invalid_related_objects(self, first_model, first_field, 137 second_model, second_field): 138 errors = self._check_invalid_related_objects_one_way( 139 first_model, first_field, second_model) 140 errors.extend(self._check_invalid_related_objects_one_way( 141 second_model, second_field, first_model)) 142 return errors 143 144 145 def _check_all_invalid_related_objects(self): 146 model_pairs = ((models.Host, 'labels', models.Label, 'host_set'), 147 (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'), 148 (models.AclGroup, 'users', models.User, 'aclgroup_set'), 149 (models.Test, 'dependency_labels', models.Label, 150 'test_set')) 151 errors = [] 152 for first_model, first_field, second_model, second_field in model_pairs: 153 errors.extend(self._check_invalid_related_objects( 154 first_model, first_field, second_model, second_field)) 155 156 if errors: 157 subject = ('%s relationships to invalid models, cleaned all' % 158 len(errors)) 159 message = '\n'.join(errors) 160 logging.warning(subject) 161 logging.warning(message) 162 email_manager.manager.enqueue_notify_email(subject, message) 163 164 165 def _clear_inactive_blocks(self): 166 msg = 'Clear out blocks for all completed jobs.' 167 logging.info(msg) 168 # this would be simpler using NOT IN (subquery), but MySQL 169 # treats all IN subqueries as dependent, so this optimizes much 170 # better 171 self._db.execute(""" 172 DELETE ihq FROM ineligible_host_queues ihq 173 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries 174 WHERE NOT complete) hqe 175 USING (job_id) WHERE hqe.job_id IS NULL""") 176 177 178class TwentyFourHourUpkeep(PeriodicCleanup): 179 """Cleanup that runs at the startup of monitor_db and every subsequent 180 twenty four hours. 181 """ 182 183 184 def __init__(self, db, run_at_initialize=True): 185 clean_interval = 24 * 60 # 24 hours 186 super(TwentyFourHourUpkeep, self).__init__( 187 db, clean_interval, run_at_initialize=run_at_initialize) 188 189 190 def _cleanup(self): 191 logging.info('Running 24 hour clean up') 192 self._django_session_cleanup() 193 self._check_for_uncleanable_db_inconsistencies() 194 195 196 def _django_session_cleanup(self): 197 """Clean up django_session since django doesn't for us. 198 http://www.djangoproject.com/documentation/0.96/sessions/ 199 """ 200 logging.info('Deleting old sessions from django_session') 201 sql = 'DELETE FROM django_session WHERE expire_date < NOW()' 202 self._db.execute(sql) 203 204 205 def _check_for_uncleanable_db_inconsistencies(self): 206 logging.info('Checking for uncleanable DB inconsistencies') 207 self._check_for_active_and_complete_queue_entries() 208 self._check_for_multiple_platform_hosts() 209 self._check_for_no_platform_hosts() 210 self._check_for_multiple_atomic_group_hosts() 211 212 213 def _check_for_active_and_complete_queue_entries(self): 214 query = models.HostQueueEntry.objects.filter(active=True, complete=True) 215 if query.count() != 0: 216 subject = ('%d queue entries found with active=complete=1' 217 % query.count()) 218 lines = [str(entry.get_object_dict()) for entry in query] 219 self._send_inconsistency_message(subject, lines) 220 221 222 def _check_for_multiple_platform_hosts(self): 223 rows = self._db.execute(""" 224 SELECT hosts.id, hostname, COUNT(1) AS platform_count, 225 GROUP_CONCAT(labels.name) 226 FROM hosts 227 INNER JOIN hosts_labels ON hosts.id = hosts_labels.host_id 228 INNER JOIN labels ON hosts_labels.label_id = labels.id 229 WHERE labels.platform 230 GROUP BY hosts.id 231 HAVING platform_count > 1 232 ORDER BY hostname""") 233 if rows: 234 subject = '%s hosts with multiple platforms' % self._db.rowcount 235 lines = [' '.join(str(item) for item in row) 236 for row in rows] 237 self._send_inconsistency_message(subject, lines) 238 239 240 def _check_for_no_platform_hosts(self): 241 rows = self._db.execute(""" 242 SELECT hostname 243 FROM hosts 244 LEFT JOIN hosts_labels 245 ON hosts.id = hosts_labels.host_id 246 AND hosts_labels.label_id IN (SELECT id FROM labels 247 WHERE platform) 248 WHERE NOT hosts.invalid AND hosts_labels.host_id IS NULL""") 249 if rows: 250 subject = '%s hosts with no platform' % self._db.rowcount 251 self._send_inconsistency_message( 252 subject, [', '.join(row[0] for row in rows)]) 253 254 255 def _check_for_multiple_atomic_group_hosts(self): 256 rows = self._db.execute(""" 257 SELECT hosts.id, hostname, COUNT(1) AS atomic_group_count, 258 GROUP_CONCAT(labels.name), GROUP_CONCAT(atomic_groups.name) 259 FROM hosts 260 INNER JOIN hosts_labels ON hosts.id = hosts_labels.host_id 261 INNER JOIN labels ON hosts_labels.label_id = labels.id 262 INNER JOIN atomic_groups ON 263 labels.atomic_group_id = atomic_groups.id 264 WHERE NOT hosts.invalid AND NOT labels.invalid 265 GROUP BY hosts.id 266 HAVING atomic_group_count > 1 267 ORDER BY hostname""") 268 if rows: 269 subject = '%s hosts with multiple atomic groups' % self._db.rowcount 270 lines = [' '.join(str(item) for item in row) 271 for row in rows] 272 self._send_inconsistency_message(subject, lines) 273 274 275 def _send_inconsistency_message(self, subject, lines): 276 logging.error(subject) 277 message = '\n'.join(lines) 278 if len(message) > 5000: 279 message = message[:5000] + '\n(truncated)\n' 280 email_manager.manager.enqueue_notify_email(subject, message) 281