1"""
2Autotest AFE Cleanup used by the scheduler
3"""
4
5
6import logging
7import random
8import time
9
10from autotest_lib.client.common_lib import utils
11from autotest_lib.frontend.afe import models
12from autotest_lib.scheduler import email_manager
13from autotest_lib.scheduler import scheduler_config
14from autotest_lib.client.common_lib import global_config
15from autotest_lib.client.common_lib import host_protections
16
17try:
18    from chromite.lib import metrics
19except ImportError:
20    metrics = utils.metrics_mock
21
22
23class PeriodicCleanup(object):
24    """Base class to schedule periodical cleanup work.
25    """
26
27    def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
28        self._db = db
29        self.clean_interval_minutes = clean_interval_minutes
30        self._last_clean_time = time.time()
31        self._run_at_initialize = run_at_initialize
32
33
34    def initialize(self):
35        """Method called by scheduler at the startup.
36        """
37        if self._run_at_initialize:
38            self._cleanup()
39
40
41    def run_cleanup_maybe(self):
42        """Test if cleanup method should be called.
43        """
44        should_cleanup = (self._last_clean_time +
45                          self.clean_interval_minutes * 60
46                          < time.time())
47        if should_cleanup:
48            self._cleanup()
49            self._last_clean_time = time.time()
50
51
52    def _cleanup(self):
53        """Abrstract cleanup method."""
54        raise NotImplementedError
55
56
57class UserCleanup(PeriodicCleanup):
58    """User cleanup that is controlled by the global config variable
59       clean_interval_minutes in the SCHEDULER section.
60    """
61
62    def __init__(self, db, clean_interval_minutes):
63        super(UserCleanup, self).__init__(db, clean_interval_minutes)
64        self._last_reverify_time = time.time()
65
66
67    @metrics.SecondsTimerDecorator(
68            'chromeos/autotest/scheduler/cleanup/user/durations')
69    def _cleanup(self):
70        logging.info('Running periodic cleanup')
71        self._abort_timed_out_jobs()
72        self._abort_jobs_past_max_runtime()
73        self._clear_inactive_blocks()
74        self._check_for_db_inconsistencies()
75        self._reverify_dead_hosts()
76        self._django_session_cleanup()
77
78
79    def _abort_timed_out_jobs(self):
80        msg = 'Aborting all jobs that have timed out and are not complete'
81        logging.info(msg)
82        query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
83            where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
84        for job in query.distinct():
85            logging.warning('Aborting job %d due to job timeout', job.id)
86            job.abort()
87
88
89    def _abort_jobs_past_max_runtime(self):
90        """
91        Abort executions that have started and are past the job's max runtime.
92        """
93        logging.info('Aborting all jobs that have passed maximum runtime')
94        rows = self._db.execute("""
95            SELECT hqe.id FROM afe_host_queue_entries AS hqe
96            WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS
97            (select * from afe_jobs where hqe.job_id=afe_jobs.id and
98             hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW())
99            """)
100        query = models.HostQueueEntry.objects.filter(
101            id__in=[row[0] for row in rows])
102        for queue_entry in query.distinct():
103            logging.warning('Aborting entry %s due to max runtime', queue_entry)
104            queue_entry.abort()
105
106
107    def _check_for_db_inconsistencies(self):
108        logging.info('Cleaning db inconsistencies')
109        self._check_all_invalid_related_objects()
110
111
112    def _check_invalid_related_objects_one_way(self, first_model,
113                                               relation_field, second_model):
114        if 'invalid' not in first_model.get_field_dict():
115            return []
116        invalid_objects = list(first_model.objects.filter(invalid=True))
117        first_model.objects.populate_relationships(invalid_objects,
118                                                   second_model,
119                                                   'related_objects')
120        error_lines = []
121        for invalid_object in invalid_objects:
122            if invalid_object.related_objects:
123                related_list = ', '.join(str(related_object) for related_object
124                                         in invalid_object.related_objects)
125                error_lines.append('Invalid %s %s is related to %ss: %s'
126                                   % (first_model.__name__, invalid_object,
127                                      second_model.__name__, related_list))
128                related_manager = getattr(invalid_object, relation_field)
129                related_manager.clear()
130        return error_lines
131
132
133    def _check_invalid_related_objects(self, first_model, first_field,
134                                       second_model, second_field):
135        errors = self._check_invalid_related_objects_one_way(
136            first_model, first_field, second_model)
137        errors.extend(self._check_invalid_related_objects_one_way(
138            second_model, second_field, first_model))
139        return errors
140
141
142    def _check_all_invalid_related_objects(self):
143        model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
144                       (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
145                       (models.AclGroup, 'users', models.User, 'aclgroup_set'),
146                       (models.Test, 'dependency_labels', models.Label,
147                        'test_set'))
148        errors = []
149        for first_model, first_field, second_model, second_field in model_pairs:
150            errors.extend(self._check_invalid_related_objects(
151                first_model, first_field, second_model, second_field))
152
153        if errors:
154            m = 'chromeos/autotest/scheduler/cleanup/invalid_models_cleaned'
155            metrics.Counter(m).increment_by(len(errors))
156            logging.warn('Cleaned invalid models due to errors: %s'
157                         % ('\n'.join(errors)))
158
159    def _clear_inactive_blocks(self):
160        msg = 'Clear out blocks for all completed jobs.'
161        logging.info(msg)
162        # this would be simpler using NOT IN (subquery), but MySQL
163        # treats all IN subqueries as dependent, so this optimizes much
164        # better
165        self._db.execute("""
166                DELETE ihq FROM afe_ineligible_host_queues ihq
167                WHERE NOT EXISTS
168                    (SELECT job_id FROM afe_host_queue_entries hqe
169                     WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""")
170
171
172    def _should_reverify_hosts_now(self):
173        reverify_period_sec = (scheduler_config.config.reverify_period_minutes
174                               * 60)
175        if reverify_period_sec == 0:
176            return False
177        return (self._last_reverify_time + reverify_period_sec) <= time.time()
178
179
180    def _choose_subset_of_hosts_to_reverify(self, hosts):
181        """Given hosts needing verification, return a subset to reverify."""
182        max_at_once = scheduler_config.config.reverify_max_hosts_at_once
183        if (max_at_once > 0 and len(hosts) > max_at_once):
184            return random.sample(hosts, max_at_once)
185        return sorted(hosts)
186
187
188    def _reverify_dead_hosts(self):
189        if not self._should_reverify_hosts_now():
190            return
191
192        self._last_reverify_time = time.time()
193        logging.info('Checking for dead hosts to reverify')
194        hosts = models.Host.objects.filter(
195                status=models.Host.Status.REPAIR_FAILED,
196                locked=False,
197                invalid=False)
198        hosts = hosts.exclude(
199                protection=host_protections.Protection.DO_NOT_VERIFY)
200        if not hosts:
201            return
202
203        hosts = list(hosts)
204        total_hosts = len(hosts)
205        hosts = self._choose_subset_of_hosts_to_reverify(hosts)
206        logging.info('Reverifying dead hosts (%d of %d) %s', len(hosts),
207                     total_hosts, ', '.join(host.hostname for host in hosts))
208        for host in hosts:
209            models.SpecialTask.schedule_special_task(
210                    host=host, task=models.SpecialTask.Task.VERIFY)
211
212
213    def _django_session_cleanup(self):
214        """Clean up django_session since django doesn't for us.
215           http://www.djangoproject.com/documentation/0.96/sessions/
216        """
217        logging.info('Deleting old sessions from django_session')
218        sql = 'TRUNCATE TABLE django_session'
219        self._db.execute(sql)
220
221
222class TwentyFourHourUpkeep(PeriodicCleanup):
223    """Cleanup that runs at the startup of monitor_db and every subsequent
224       twenty four hours.
225    """
226
227
228    def __init__(self, db, drone_manager, run_at_initialize=True):
229        """Initialize TwentyFourHourUpkeep.
230
231        @param db: Database connection object.
232        @param drone_manager: DroneManager to access drones.
233        @param run_at_initialize: True to run cleanup when scheduler starts.
234                                  Default is set to True.
235
236        """
237        self.drone_manager = drone_manager
238        clean_interval_minutes = 24 * 60 # 24 hours
239        super(TwentyFourHourUpkeep, self).__init__(
240            db, clean_interval_minutes, run_at_initialize=run_at_initialize)
241
242
243    @metrics.SecondsTimerDecorator(
244        'chromeos/autotest/scheduler/cleanup/daily/durations')
245    def _cleanup(self):
246        logging.info('Running 24 hour clean up')
247        self._check_for_uncleanable_db_inconsistencies()
248        self._cleanup_orphaned_containers()
249
250
251    def _check_for_uncleanable_db_inconsistencies(self):
252        logging.info('Checking for uncleanable DB inconsistencies')
253        self._check_for_active_and_complete_queue_entries()
254        self._check_for_multiple_platform_hosts()
255        self._check_for_no_platform_hosts()
256
257
258    def _check_for_active_and_complete_queue_entries(self):
259        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
260        if query.count() != 0:
261            subject = ('%d queue entries found with active=complete=1'
262                       % query.count())
263            lines = []
264            for entry in query:
265                lines.append(str(entry.get_object_dict()))
266                if entry.status == 'Aborted':
267                    logging.error('Aborted entry: %s is both active and '
268                                  'complete. Setting active value to False.',
269                                  str(entry))
270                    entry.active = False
271                    entry.save()
272            self._send_inconsistency_message(subject, lines)
273
274
275    def _check_for_multiple_platform_hosts(self):
276        rows = self._db.execute("""
277            SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
278                   GROUP_CONCAT(afe_labels.name)
279            FROM afe_hosts
280            INNER JOIN afe_hosts_labels ON
281                    afe_hosts.id = afe_hosts_labels.host_id
282            INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
283            WHERE afe_labels.platform
284            GROUP BY afe_hosts.id
285            HAVING platform_count > 1
286            ORDER BY hostname""")
287        if rows:
288            subject = '%s hosts with multiple platforms' % self._db.rowcount
289            lines = [' '.join(str(item) for item in row)
290                     for row in rows]
291            self._send_inconsistency_message(subject, lines)
292
293
294    def _check_for_no_platform_hosts(self):
295        rows = self._db.execute("""
296            SELECT hostname
297            FROM afe_hosts
298            LEFT JOIN afe_hosts_labels
299              ON afe_hosts.id = afe_hosts_labels.host_id
300              AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
301                                                WHERE platform)
302            WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
303        if rows:
304            logging.warning('%s hosts with no platform\n%s', self._db.rowcount,
305                         ', '.join(row[0] for row in rows))
306
307
308    def _send_inconsistency_message(self, subject, lines):
309        logging.error(subject)
310        message = '\n'.join(lines)
311        if len(message) > 5000:
312            message = message[:5000] + '\n(truncated)\n'
313        email_manager.manager.enqueue_notify_email(subject, message)
314
315
316    def _cleanup_orphaned_containers(self):
317        """Cleanup orphaned containers in each drone.
318
319        The function queues a lxc_cleanup call in each drone without waiting for
320        the script to finish, as the cleanup procedure could take minutes and the
321        script output is logged.
322
323        """
324        ssp_enabled = global_config.global_config.get_config_value(
325                'AUTOSERV', 'enable_ssp_container')
326        if not ssp_enabled:
327            logging.info('Server-side packaging is not enabled, no need to clean'
328                         ' up orphaned containers.')
329            return
330        self.drone_manager.cleanup_orphaned_containers()
331