1"""
2Autotest AFE Cleanup used by the scheduler
3"""
4
5
6import time, logging, random
7
8from autotest_lib.frontend.afe import models
9from autotest_lib.scheduler import email_manager
10from autotest_lib.scheduler import scheduler_config
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import host_protections
13from autotest_lib.client.common_lib.cros.graphite import autotest_stats
14
15
16class PeriodicCleanup(object):
17    """Base class to schedule periodical cleanup work.
18    """
19
20    def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
21        self._db = db
22        self.clean_interval_minutes = clean_interval_minutes
23        self._last_clean_time = time.time()
24        self._run_at_initialize = run_at_initialize
25
26
27    def initialize(self):
28        """Method called by scheduler at the startup.
29        """
30        if self._run_at_initialize:
31            self._cleanup()
32
33
34    def run_cleanup_maybe(self):
35        """Test if cleanup method should be called.
36        """
37        should_cleanup = (self._last_clean_time +
38                          self.clean_interval_minutes * 60
39                          < time.time())
40        if should_cleanup:
41            self._cleanup()
42            self._last_clean_time = time.time()
43
44
45    def _cleanup(self):
46        """Abrstract cleanup method."""
47        raise NotImplementedError
48
49
50class UserCleanup(PeriodicCleanup):
51    """User cleanup that is controlled by the global config variable
52       clean_interval_minutes in the SCHEDULER section.
53    """
54    timer = autotest_stats.Timer('monitor_db_cleanup.user_cleanup')
55
56
57    def __init__(self, db, clean_interval_minutes):
58        super(UserCleanup, self).__init__(db, clean_interval_minutes)
59        self._last_reverify_time = time.time()
60
61
62    @timer.decorate
63    def _cleanup(self):
64        logging.info('Running periodic cleanup')
65        self._abort_timed_out_jobs()
66        self._abort_jobs_past_max_runtime()
67        self._clear_inactive_blocks()
68        self._check_for_db_inconsistencies()
69        self._reverify_dead_hosts()
70        self._django_session_cleanup()
71
72
73    @timer.decorate
74    def _abort_timed_out_jobs(self):
75        msg = 'Aborting all jobs that have timed out and are not complete'
76        logging.info(msg)
77        query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
78            where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
79        for job in query.distinct():
80            logging.warning('Aborting job %d due to job timeout', job.id)
81            job.abort()
82
83
84    @timer.decorate
85    def _abort_jobs_past_max_runtime(self):
86        """
87        Abort executions that have started and are past the job's max runtime.
88        """
89        logging.info('Aborting all jobs that have passed maximum runtime')
90        rows = self._db.execute("""
91            SELECT hqe.id
92            FROM afe_host_queue_entries AS hqe
93            INNER JOIN afe_jobs ON (hqe.job_id = afe_jobs.id)
94            WHERE NOT hqe.complete AND NOT hqe.aborted AND
95            hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE <
96            NOW()""")
97        query = models.HostQueueEntry.objects.filter(
98            id__in=[row[0] for row in rows])
99        for queue_entry in query.distinct():
100            logging.warning('Aborting entry %s due to max runtime', queue_entry)
101            queue_entry.abort()
102
103
104    @timer.decorate
105    def _check_for_db_inconsistencies(self):
106        logging.info('Cleaning db inconsistencies')
107        self._check_all_invalid_related_objects()
108
109
110    def _check_invalid_related_objects_one_way(self, first_model,
111                                               relation_field, second_model):
112        if 'invalid' not in first_model.get_field_dict():
113            return []
114        invalid_objects = list(first_model.objects.filter(invalid=True))
115        first_model.objects.populate_relationships(invalid_objects,
116                                                   second_model,
117                                                   'related_objects')
118        error_lines = []
119        for invalid_object in invalid_objects:
120            if invalid_object.related_objects:
121                related_list = ', '.join(str(related_object) for related_object
122                                         in invalid_object.related_objects)
123                error_lines.append('Invalid %s %s is related to %ss: %s'
124                                   % (first_model.__name__, invalid_object,
125                                      second_model.__name__, related_list))
126                related_manager = getattr(invalid_object, relation_field)
127                related_manager.clear()
128        return error_lines
129
130
131    def _check_invalid_related_objects(self, first_model, first_field,
132                                       second_model, second_field):
133        errors = self._check_invalid_related_objects_one_way(
134            first_model, first_field, second_model)
135        errors.extend(self._check_invalid_related_objects_one_way(
136            second_model, second_field, first_model))
137        return errors
138
139
140    def _check_all_invalid_related_objects(self):
141        model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
142                       (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
143                       (models.AclGroup, 'users', models.User, 'aclgroup_set'),
144                       (models.Test, 'dependency_labels', models.Label,
145                        'test_set'))
146        errors = []
147        for first_model, first_field, second_model, second_field in model_pairs:
148            errors.extend(self._check_invalid_related_objects(
149                first_model, first_field, second_model, second_field))
150
151        if errors:
152            subject = ('%s relationships to invalid models, cleaned all' %
153                       len(errors))
154            message = '\n'.join(errors)
155            logging.warning(subject)
156            logging.warning(message)
157            email_manager.manager.enqueue_notify_email(subject, message)
158
159
160    @timer.decorate
161    def _clear_inactive_blocks(self):
162        msg = 'Clear out blocks for all completed jobs.'
163        logging.info(msg)
164        # this would be simpler using NOT IN (subquery), but MySQL
165        # treats all IN subqueries as dependent, so this optimizes much
166        # better
167        self._db.execute("""
168            DELETE ihq FROM afe_ineligible_host_queues ihq
169            LEFT JOIN (SELECT DISTINCT job_id FROM afe_host_queue_entries
170                       WHERE NOT complete) hqe
171            USING (job_id) WHERE hqe.job_id IS NULL""")
172
173
174    def _should_reverify_hosts_now(self):
175        reverify_period_sec = (scheduler_config.config.reverify_period_minutes
176                               * 60)
177        if reverify_period_sec == 0:
178            return False
179        return (self._last_reverify_time + reverify_period_sec) <= time.time()
180
181
182    def _choose_subset_of_hosts_to_reverify(self, hosts):
183        """Given hosts needing verification, return a subset to reverify."""
184        max_at_once = scheduler_config.config.reverify_max_hosts_at_once
185        if (max_at_once > 0 and len(hosts) > max_at_once):
186            return random.sample(hosts, max_at_once)
187        return sorted(hosts)
188
189
190    @timer.decorate
191    def _reverify_dead_hosts(self):
192        if not self._should_reverify_hosts_now():
193            return
194
195        self._last_reverify_time = time.time()
196        logging.info('Checking for dead hosts to reverify')
197        hosts = models.Host.objects.filter(
198                status=models.Host.Status.REPAIR_FAILED,
199                locked=False,
200                invalid=False)
201        hosts = hosts.exclude(
202                protection=host_protections.Protection.DO_NOT_VERIFY)
203        if not hosts:
204            return
205
206        hosts = list(hosts)
207        total_hosts = len(hosts)
208        hosts = self._choose_subset_of_hosts_to_reverify(hosts)
209        logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts),
210                     total_hosts, ', '.join(host.hostname for host in hosts))
211        for host in hosts:
212            models.SpecialTask.schedule_special_task(
213                    host=host, task=models.SpecialTask.Task.VERIFY)
214
215
216    @timer.decorate
217    def _django_session_cleanup(self):
218        """Clean up django_session since django doesn't for us.
219           http://www.djangoproject.com/documentation/0.96/sessions/
220        """
221        logging.info('Deleting old sessions from django_session')
222        sql = 'TRUNCATE TABLE django_session'
223        self._db.execute(sql)
224
225
226class TwentyFourHourUpkeep(PeriodicCleanup):
227    """Cleanup that runs at the startup of monitor_db and every subsequent
228       twenty four hours.
229    """
230    timer = autotest_stats.Timer('monitor_db_cleanup.twentyfourhour_cleanup')
231
232
233    def __init__(self, db, drone_manager, run_at_initialize=True):
234        """Initialize TwentyFourHourUpkeep.
235
236        @param db: Database connection object.
237        @param drone_manager: DroneManager to access drones.
238        @param run_at_initialize: True to run cleanup when scheduler starts.
239                                  Default is set to True.
240
241        """
242        self.drone_manager = drone_manager
243        clean_interval_minutes = 24 * 60 # 24 hours
244        super(TwentyFourHourUpkeep, self).__init__(
245            db, clean_interval_minutes, run_at_initialize=run_at_initialize)
246
247
248    @timer.decorate
249    def _cleanup(self):
250        logging.info('Running 24 hour clean up')
251        self._check_for_uncleanable_db_inconsistencies()
252        self._cleanup_orphaned_containers()
253
254
255    @timer.decorate
256    def _check_for_uncleanable_db_inconsistencies(self):
257        logging.info('Checking for uncleanable DB inconsistencies')
258        self._check_for_active_and_complete_queue_entries()
259        self._check_for_multiple_platform_hosts()
260        self._check_for_no_platform_hosts()
261        self._check_for_multiple_atomic_group_hosts()
262
263
264    @timer.decorate
265    def _check_for_active_and_complete_queue_entries(self):
266        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
267        if query.count() != 0:
268            subject = ('%d queue entries found with active=complete=1'
269                       % query.count())
270            lines = []
271            for entry in query:
272                lines.append(str(entry.get_object_dict()))
273                if entry.status == 'Aborted':
274                    logging.error('Aborted entry: %s is both active and '
275                                  'complete. Setting active value to False.',
276                                  str(entry))
277                    entry.active = False
278                    entry.save()
279            self._send_inconsistency_message(subject, lines)
280
281
282    @timer.decorate
283    def _check_for_multiple_platform_hosts(self):
284        rows = self._db.execute("""
285            SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
286                   GROUP_CONCAT(afe_labels.name)
287            FROM afe_hosts
288            INNER JOIN afe_hosts_labels ON
289                    afe_hosts.id = afe_hosts_labels.host_id
290            INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
291            WHERE afe_labels.platform
292            GROUP BY afe_hosts.id
293            HAVING platform_count > 1
294            ORDER BY hostname""")
295        if rows:
296            subject = '%s hosts with multiple platforms' % self._db.rowcount
297            lines = [' '.join(str(item) for item in row)
298                     for row in rows]
299            self._send_inconsistency_message(subject, lines)
300
301
302    @timer.decorate
303    def _check_for_no_platform_hosts(self):
304        rows = self._db.execute("""
305            SELECT hostname
306            FROM afe_hosts
307            LEFT JOIN afe_hosts_labels
308              ON afe_hosts.id = afe_hosts_labels.host_id
309              AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
310                                                WHERE platform)
311            WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
312        if rows:
313            logging.warning('%s hosts with no platform\n%s', self._db.rowcount,
314                         ', '.join(row[0] for row in rows))
315
316
317    @timer.decorate
318    def _check_for_multiple_atomic_group_hosts(self):
319        rows = self._db.execute("""
320            SELECT afe_hosts.id, hostname,
321                   COUNT(DISTINCT afe_atomic_groups.name) AS atomic_group_count,
322                   GROUP_CONCAT(afe_labels.name),
323                   GROUP_CONCAT(afe_atomic_groups.name)
324            FROM afe_hosts
325            INNER JOIN afe_hosts_labels ON
326                    afe_hosts.id = afe_hosts_labels.host_id
327            INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
328            INNER JOIN afe_atomic_groups ON
329                       afe_labels.atomic_group_id = afe_atomic_groups.id
330            WHERE NOT afe_hosts.invalid AND NOT afe_labels.invalid
331            GROUP BY afe_hosts.id
332            HAVING atomic_group_count > 1
333            ORDER BY hostname""")
334        if rows:
335            subject = '%s hosts with multiple atomic groups' % self._db.rowcount
336            lines = [' '.join(str(item) for item in row)
337                     for row in rows]
338            self._send_inconsistency_message(subject, lines)
339
340
341    def _send_inconsistency_message(self, subject, lines):
342        logging.error(subject)
343        message = '\n'.join(lines)
344        if len(message) > 5000:
345            message = message[:5000] + '\n(truncated)\n'
346        email_manager.manager.enqueue_notify_email(subject, message)
347
348
349    @timer.decorate
350    def _cleanup_orphaned_containers(self):
351        """Cleanup orphaned containers in each drone.
352
353        The function queues a lxc_cleanup call in each drone without waiting for
354        the script to finish, as the cleanup procedure could take minutes and the
355        script output is logged.
356
357        """
358        ssp_enabled = global_config.global_config.get_config_value(
359                'AUTOSERV', 'enable_ssp_container')
360        if not ssp_enabled:
361            logging.info('Server-side packaging is not enabled, no need to clean'
362                         ' up orphaned containers.')
363            return
364        self.drone_manager.cleanup_orphaned_containers()
365