monitor_db_cleanup.py revision 6157c63947d2d628d187a084acb0a48473af1c79
1"""
2Autotest AFE Cleanup used by the scheduler
3"""
4
5
6import datetime, time, logging
7import common
8from autotest_lib.database import database_connection
9from autotest_lib.frontend.afe import models
10from autotest_lib.scheduler import email_manager, scheduler_config
11
12
13class PeriodicCleanup(object):
14
15
16    def __init__(self, db, clean_interval, run_at_initialize=False):
17        self._db = db
18        self.clean_interval = clean_interval
19        self._last_clean_time = time.time()
20        self._run_at_initialize = run_at_initialize
21
22
23    def initialize(self):
24        if self._run_at_initialize:
25            self._cleanup()
26
27
28    def run_cleanup_maybe(self):
29        should_cleanup = (self._last_clean_time + self.clean_interval * 60
30                          < time.time())
31        if should_cleanup:
32            self._cleanup()
33            self._last_clean_time = time.time()
34
35
36    def _cleanup(self):
37        """Abrstract cleanup method."""
38        raise NotImplementedError
39
40
41class UserCleanup(PeriodicCleanup):
42    """User cleanup that is controlled by the global config variable
43       clean_interval in the SCHEDULER section.
44    """
45
46
47    def __init__(self, db, clean_interval_minutes):
48        super(UserCleanup, self).__init__(db, clean_interval_minutes)
49
50
51    def _cleanup(self):
52            logging.info('Running periodic cleanup')
53            self._abort_timed_out_jobs()
54            self._abort_jobs_past_synch_start_timeout()
55            self._abort_jobs_past_max_runtime()
56            self._clear_inactive_blocks()
57            self._check_for_db_inconsistencies()
58
59
60    def _abort_timed_out_jobs(self):
61        msg = 'Aborting all jobs that have timed out and are not complete'
62        logging.info(msg)
63        query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
64            where=['created_on + INTERVAL timeout HOUR < NOW()'])
65        for job in query.distinct():
66            logging.warning('Aborting job %d due to job timeout', job.id)
67            job.abort(None)
68
69
70    def _abort_jobs_past_synch_start_timeout(self):
71        """
72        Abort synchronous jobs that are past the start timeout (from global
73        config) and are holding a machine that's in everyone.
74        """
75        msg = 'Aborting synchronous jobs that are past the start timeout'
76        logging.info(msg)
77        timeout_delta = datetime.timedelta(
78            minutes=scheduler_config.config.synch_job_start_timeout_minutes)
79        timeout_start = datetime.datetime.now() - timeout_delta
80        query = models.Job.objects.filter(
81            created_on__lt=timeout_start,
82            hostqueueentry__status='Pending',
83            hostqueueentry__host__aclgroup__name='Everyone')
84        for job in query.distinct():
85            logging.warning('Aborting job %d due to start timeout', job.id)
86            entries_to_abort = job.hostqueueentry_set.exclude(
87                status=models.HostQueueEntry.Status.RUNNING)
88            for queue_entry in entries_to_abort:
89                queue_entry.abort(None)
90
91
92    def _abort_jobs_past_max_runtime(self):
93        """
94        Abort executions that have started and are past the job's max runtime.
95        """
96        logging.info('Aborting all jobs that have passed maximum runtime')
97        rows = self._db.execute("""
98            SELECT hqe.id
99            FROM host_queue_entries AS hqe
100            INNER JOIN jobs ON (hqe.job_id = jobs.id)
101            WHERE NOT hqe.complete AND NOT hqe.aborted AND
102            hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""")
103        query = models.HostQueueEntry.objects.filter(
104            id__in=[row[0] for row in rows])
105        for queue_entry in query.distinct():
106            logging.warning('Aborting entry %s due to max runtime', queue_entry)
107            queue_entry.abort(None)
108
109
110    def _check_for_db_inconsistencies(self):
111        logging.info('Cleaning db inconsistencies')
112        self._check_all_invalid_related_objects()
113
114
115    def _check_invalid_related_objects_one_way(self, first_model,
116                                               relation_field, second_model):
117        if 'invalid' not in first_model.get_field_dict():
118            return []
119        invalid_objects = list(first_model.objects.filter(invalid=True))
120        first_model.objects.populate_relationships(invalid_objects,
121                                                   second_model,
122                                                   'related_objects')
123        error_lines = []
124        for invalid_object in invalid_objects:
125            if invalid_object.related_objects:
126                related_list = ', '.join(str(related_object) for related_object
127                                         in invalid_object.related_objects)
128                error_lines.append('Invalid %s %s is related to %ss: %s'
129                                   % (first_model.__name__, invalid_object,
130                                      second_model.__name__, related_list))
131                related_manager = getattr(invalid_object, relation_field)
132                related_manager.clear()
133        return error_lines
134
135
136    def _check_invalid_related_objects(self, first_model, first_field,
137                                       second_model, second_field):
138        errors = self._check_invalid_related_objects_one_way(
139            first_model, first_field, second_model)
140        errors.extend(self._check_invalid_related_objects_one_way(
141            second_model, second_field, first_model))
142        return errors
143
144
145    def _check_all_invalid_related_objects(self):
146        model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
147                       (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
148                       (models.AclGroup, 'users', models.User, 'aclgroup_set'),
149                       (models.Test, 'dependency_labels', models.Label,
150                        'test_set'))
151        errors = []
152        for first_model, first_field, second_model, second_field in model_pairs:
153            errors.extend(self._check_invalid_related_objects(
154                first_model, first_field, second_model, second_field))
155
156        if errors:
157            subject = ('%s relationships to invalid models, cleaned all' %
158                       len(errors))
159            message = '\n'.join(errors)
160            logging.warning(subject)
161            logging.warning(message)
162            email_manager.manager.enqueue_notify_email(subject, message)
163
164
165    def _clear_inactive_blocks(self):
166        msg = 'Clear out blocks for all completed jobs.'
167        logging.info(msg)
168        # this would be simpler using NOT IN (subquery), but MySQL
169        # treats all IN subqueries as dependent, so this optimizes much
170        # better
171        self._db.execute("""
172            DELETE ihq FROM ineligible_host_queues ihq
173            LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
174                       WHERE NOT complete) hqe
175            USING (job_id) WHERE hqe.job_id IS NULL""")
176
177
178class TwentyFourHourUpkeep(PeriodicCleanup):
179    """Cleanup that runs at the startup of monitor_db and every subsequent
180       twenty four hours.
181    """
182
183
184    def __init__(self, db, run_at_initialize=True):
185        clean_interval = 24 * 60 # 24 hours
186        super(TwentyFourHourUpkeep, self).__init__(
187            db, clean_interval, run_at_initialize=run_at_initialize)
188
189
190    def _cleanup(self):
191        logging.info('Running 24 hour clean up')
192        self._django_session_cleanup()
193        self._check_for_uncleanable_db_inconsistencies()
194
195
196    def _django_session_cleanup(self):
197        """Clean up django_session since django doesn't for us.
198           http://www.djangoproject.com/documentation/0.96/sessions/
199        """
200        logging.info('Deleting old sessions from django_session')
201        sql = 'DELETE FROM django_session WHERE expire_date < NOW()'
202        self._db.execute(sql)
203
204
205    def _check_for_uncleanable_db_inconsistencies(self):
206        logging.info('Checking for uncleanable DB inconsistencies')
207        self._check_for_active_and_complete_queue_entries()
208        self._check_for_multiple_platform_hosts()
209        self._check_for_no_platform_hosts()
210        self._check_for_multiple_atomic_group_hosts()
211
212
213    def _check_for_active_and_complete_queue_entries(self):
214        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
215        if query.count() != 0:
216            subject = ('%d queue entries found with active=complete=1'
217                       % query.count())
218            lines = [str(entry.get_object_dict()) for entry in query]
219            self._send_inconsistency_message(subject, lines)
220
221
222    def _check_for_multiple_platform_hosts(self):
223        rows = self._db.execute("""
224            SELECT hosts.id, hostname, COUNT(1) AS platform_count,
225                   GROUP_CONCAT(labels.name)
226            FROM hosts
227            INNER JOIN hosts_labels ON hosts.id = hosts_labels.host_id
228            INNER JOIN labels ON hosts_labels.label_id = labels.id
229            WHERE labels.platform
230            GROUP BY hosts.id
231            HAVING platform_count > 1
232            ORDER BY hostname""")
233        if rows:
234            subject = '%s hosts with multiple platforms' % self._db.rowcount
235            lines = [' '.join(str(item) for item in row)
236                     for row in rows]
237            self._send_inconsistency_message(subject, lines)
238
239
240    def _check_for_no_platform_hosts(self):
241        rows = self._db.execute("""
242            SELECT hostname
243            FROM hosts
244            LEFT JOIN hosts_labels
245              ON hosts.id = hosts_labels.host_id
246              AND hosts_labels.label_id IN (SELECT id FROM labels
247                                            WHERE platform)
248            WHERE NOT hosts.invalid AND hosts_labels.host_id IS NULL""")
249        if rows:
250            subject = '%s hosts with no platform' % self._db.rowcount
251            self._send_inconsistency_message(
252                subject, [', '.join(row[0] for row in rows)])
253
254
255    def _check_for_multiple_atomic_group_hosts(self):
256        rows = self._db.execute("""
257            SELECT hosts.id, hostname, COUNT(1) AS atomic_group_count,
258                   GROUP_CONCAT(labels.name), GROUP_CONCAT(atomic_groups.name)
259            FROM hosts
260            INNER JOIN hosts_labels ON hosts.id = hosts_labels.host_id
261            INNER JOIN labels ON hosts_labels.label_id = labels.id
262            INNER JOIN atomic_groups ON
263                       labels.atomic_group_id = atomic_groups.id
264            WHERE NOT hosts.invalid AND NOT labels.invalid
265            GROUP BY hosts.id
266            HAVING atomic_group_count > 1
267            ORDER BY hostname""")
268        if rows:
269            subject = '%s hosts with multiple atomic groups' % self._db.rowcount
270            lines = [' '.join(str(item) for item in row)
271                     for row in rows]
272            self._send_inconsistency_message(subject, lines)
273
274
275    def _send_inconsistency_message(self, subject, lines):
276        logging.error(subject)
277        message = '\n'.join(lines)
278        if len(message) > 5000:
279            message = message[:5000] + '\n(truncated)\n'
280        email_manager.manager.enqueue_notify_email(subject, message)
281