1#pylint: disable-msg=C0111
2
3"""
4Postjob task.
5
6Postjob tasks are responsible for setting the final status of the HQE
7and Host, and scheduling additional special agents such as cleanup,
8if necessary.
9"""
10
11import os
12
13from autotest_lib.client.common_lib.cros.graphite import autotest_stats
14from autotest_lib.frontend.afe import models, model_attributes
15from autotest_lib.scheduler import agent_task, drones, drone_manager
16from autotest_lib.scheduler import email_manager, pidfile_monitor
17from autotest_lib.scheduler import scheduler_config
18from autotest_lib.server import autoserv_utils
19
20
21_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
22
23
24class PostJobTask(agent_task.AgentTask):
25    def __init__(self, queue_entries, log_file_name):
26        super(PostJobTask, self).__init__(log_file_name=log_file_name)
27
28        self.queue_entries = queue_entries
29
30        self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
31        self._autoserv_monitor.attach_to_existing_process(
32                self._working_directory())
33
34
35    def _command_line(self):
36        # Do we need testing_mode?
37        return self._generate_command(
38                self._drone_manager.absolute_path(self._working_directory()))
39
40
41    def _generate_command(self, results_dir):
42        raise NotImplementedError('Subclasses must override this')
43
44
45    @property
46    def owner_username(self):
47        return self.queue_entries[0].job.owner
48
49
50    def _working_directory(self):
51        return self._get_consistent_execution_path(self.queue_entries)
52
53
54    def _paired_with_monitor(self):
55        return self._autoserv_monitor
56
57
58    def _job_was_aborted(self):
59        was_aborted = None
60        for queue_entry in self.queue_entries:
61            queue_entry.update_from_database()
62            if was_aborted is None: # first queue entry
63                was_aborted = bool(queue_entry.aborted)
64            elif was_aborted != bool(queue_entry.aborted): # subsequent entries
65                entries = ['%s (aborted: %s)' % (entry, entry.aborted)
66                           for entry in self.queue_entries]
67                email_manager.manager.enqueue_notify_email(
68                        'Inconsistent abort state',
69                        'Queue entries have inconsistent abort state:\n' +
70                        '\n'.join(entries))
71                # don't crash here, just assume true
72                return True
73        return was_aborted
74
75
76    def _final_status(self):
77        if self._job_was_aborted():
78            return models.HostQueueEntry.Status.ABORTED
79
80        # we'll use a PidfileRunMonitor to read the autoserv exit status
81        if self._autoserv_monitor.exit_code() == 0:
82            return models.HostQueueEntry.Status.COMPLETED
83        return models.HostQueueEntry.Status.FAILED
84
85
86    def _set_all_statuses(self, status):
87        for queue_entry in self.queue_entries:
88            queue_entry.set_status(status)
89
90
91    def abort(self):
92        # override AgentTask.abort() to avoid killing the process and ending
93        # the task.  post-job tasks continue when the job is aborted.
94        pass
95
96
97    def _pidfile_label(self):
98        # '.autoserv_execute' -> 'autoserv'
99        return self._pidfile_name()[1:-len('_execute')]
100
101
102class SelfThrottledPostJobTask(PostJobTask):
103    """
104    PostJobTask that maintains its own process limit.
105
106    We throttle tasks like parsing because we don't want them to
107    hold up tests. At the same time we don't wish to build up load
108    that will take forever to parse.
109    """
110    _num_running_processes = 0
111    # Last known limit of max processes, used to check whether
112    # max processes config has been changed.
113    _last_known_max_processes = 0
114    # Whether an email should be sent to notifiy process limit being hit.
115    _notification_on = True
116    # Once process limit is hit, an email will be sent.
117    # To prevent spams, do not send another email until
118    # it drops to lower than the following level.
119    REVIVE_NOTIFICATION_THRESHOLD = 0.80
120
121
122    @classmethod
123    def _increment_running_processes(cls):
124        cls._num_running_processes += 1
125        autotest_stats.Gauge('scheduler').send(
126                '%s.num_running_processes' % cls.__name__,
127                cls._num_running_processes)
128
129
130    @classmethod
131    def _decrement_running_processes(cls):
132        cls._num_running_processes -= 1
133        autotest_stats.Gauge('scheduler').send(
134                '%s.num_running_processes' % cls.__name__,
135                cls._num_running_processes)
136
137
138    @classmethod
139    def _max_processes(cls):
140        raise NotImplementedError
141
142
143    @classmethod
144    def _can_run_new_process(cls):
145        return cls._num_running_processes < cls._max_processes()
146
147
148    def _process_started(self):
149        return bool(self.monitor)
150
151
152    def tick(self):
153        # override tick to keep trying to start until the process count goes
154        # down and we can, at which point we revert to default behavior
155        if self._process_started():
156            super(SelfThrottledPostJobTask, self).tick()
157        else:
158            self._try_starting_process()
159
160
161    def run(self):
162        # override run() to not actually run unless we can
163        self._try_starting_process()
164
165
166    @classmethod
167    def _notify_process_limit_hit(cls):
168        """Send an email to notify that process limit is hit."""
169        if cls._notification_on:
170            subject = '%s: hitting max process limit.' % cls.__name__
171            message = ('Running processes/Max processes: %d/%d'
172                       % (cls._num_running_processes, cls._max_processes()))
173            email_manager.manager.enqueue_notify_email(subject, message)
174            cls._notification_on = False
175
176
177    @classmethod
178    def _reset_notification_switch_if_necessary(cls):
179        """Reset _notification_on if necessary.
180
181        Set _notification_on to True on the following cases:
182        1) If the limit of max processes configuration changes;
183        2) If _notification_on is False and the number of running processes
184           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
185
186        """
187        if cls._last_known_max_processes != cls._max_processes():
188            cls._notification_on = True
189            cls._last_known_max_processes = cls._max_processes()
190            return
191        percentage = float(cls._num_running_processes) / cls._max_processes()
192        if (not cls._notification_on and
193            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
194            cls._notification_on = True
195
196
197    def _try_starting_process(self):
198        self._reset_notification_switch_if_necessary()
199        if not self._can_run_new_process():
200            self._notify_process_limit_hit()
201            return
202
203        # actually run the command
204        super(SelfThrottledPostJobTask, self).run()
205        if self._process_started():
206            self._increment_running_processes()
207
208
209    def finished(self, success):
210        super(SelfThrottledPostJobTask, self).finished(success)
211        if self._process_started():
212            self._decrement_running_processes()
213
214
215class GatherLogsTask(PostJobTask):
216    """
217    Task responsible for
218    * gathering uncollected logs (if Autoserv crashed hard or was killed)
219    * copying logs to the results repository
220    * spawning CleanupTasks for hosts, if necessary
221    * spawning a FinalReparseTask for the job
222    * setting the final status of the host, directly or through a cleanup
223    """
224    def __init__(self, queue_entries, recover_run_monitor=None):
225        self._job = queue_entries[0].job
226        super(GatherLogsTask, self).__init__(
227            queue_entries, log_file_name='.collect_crashinfo.log')
228        self._set_ids(queue_entries=queue_entries)
229
230
231    # TODO: Refactor into autoserv_utils. crbug.com/243090
232    def _generate_command(self, results_dir):
233        host_list = ','.join(queue_entry.host.hostname
234                             for queue_entry in self.queue_entries)
235        return [autoserv_utils.autoserv_path , '-p',
236                '--pidfile-label=%s' % self._pidfile_label(),
237                '--use-existing-results', '--collect-crashinfo',
238                '-m', host_list, '-r', results_dir]
239
240
241    @property
242    def num_processes(self):
243        return len(self.queue_entries)
244
245
246    def _pidfile_name(self):
247        return drone_manager.CRASHINFO_PID_FILE
248
249
250    def prolog(self):
251        self._check_queue_entry_statuses(
252                self.queue_entries,
253                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
254                allowed_host_statuses=(models.Host.Status.RUNNING,))
255
256        super(GatherLogsTask, self).prolog()
257
258
259    def epilog(self):
260        super(GatherLogsTask, self).epilog()
261        self._parse_results(self.queue_entries)
262        self._reboot_hosts()
263
264
265    def _reboot_hosts(self):
266        if self._autoserv_monitor.has_process():
267            final_success = (self._final_status() ==
268                             models.HostQueueEntry.Status.COMPLETED)
269            num_tests_failed = self._autoserv_monitor.num_tests_failed()
270        else:
271            final_success = False
272            num_tests_failed = 0
273        reboot_after = self._job.reboot_after
274        do_reboot = (
275                # always reboot after aborted jobs
276                self._final_status() == models.HostQueueEntry.Status.ABORTED
277                or reboot_after == model_attributes.RebootAfter.ALWAYS
278                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
279                    and final_success and num_tests_failed == 0)
280                or num_tests_failed > 0)
281
282        for queue_entry in self.queue_entries:
283            if do_reboot:
284                # don't pass the queue entry to the CleanupTask. if the cleanup
285                # fails, the job doesn't care -- it's over.
286                models.SpecialTask.objects.create(
287                        host=models.Host.objects.get(id=queue_entry.host.id),
288                        task=models.SpecialTask.Task.CLEANUP,
289                        requested_by=self._job.owner_model())
290            else:
291                queue_entry.host.set_status(models.Host.Status.READY)
292
293
294    def run(self):
295        autoserv_exit_code = self._autoserv_monitor.exit_code()
296        # only run if Autoserv exited due to some signal. if we have no exit
297        # code, assume something bad (and signal-like) happened.
298        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
299            super(GatherLogsTask, self).run()
300        else:
301            self.finished(True)
302
303
304class FinalReparseTask(SelfThrottledPostJobTask):
305    def __init__(self, queue_entries):
306        super(FinalReparseTask, self).__init__(queue_entries,
307                                               log_file_name='.parse.log')
308        # don't use _set_ids, since we don't want to set the host_ids
309        self.queue_entry_ids = [entry.id for entry in queue_entries]
310
311
312    def _generate_command(self, results_dir):
313        return [_parser_path, '--write-pidfile', '--record-duration',
314                '-l', '2', '-r', '-o', results_dir]
315
316
317    @property
318    def num_processes(self):
319        return 0 # don't include parser processes in accounting
320
321
322    def _pidfile_name(self):
323        return drone_manager.PARSER_PID_FILE
324
325
326    @classmethod
327    def _max_processes(cls):
328        return scheduler_config.config.max_parse_processes
329
330
331    def prolog(self):
332        self._check_queue_entry_statuses(
333                self.queue_entries,
334                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
335
336        super(FinalReparseTask, self).prolog()
337
338
339    def epilog(self):
340        super(FinalReparseTask, self).epilog()
341        self._archive_results(self.queue_entries)
342
343
344class ArchiveResultsTask(SelfThrottledPostJobTask):
345    _ARCHIVING_FAILED_FILE = '.archiver_failed'
346
347    def __init__(self, queue_entries):
348        super(ArchiveResultsTask, self).__init__(queue_entries,
349                                                 log_file_name='.archiving.log')
350        # don't use _set_ids, since we don't want to set the host_ids
351        self.queue_entry_ids = [entry.id for entry in queue_entries]
352
353
354    def _pidfile_name(self):
355        return drone_manager.ARCHIVER_PID_FILE
356
357
358    # TODO: Refactor into autoserv_utils. crbug.com/243090
359    def _generate_command(self, results_dir):
360        return [autoserv_utils.autoserv_path , '-p',
361                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
362                '--use-existing-results', '--control-filename=control.archive',
363                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
364                             'archive_results.control.srv')]
365
366
367    @classmethod
368    def _max_processes(cls):
369        return scheduler_config.config.max_transfer_processes
370
371
372    def prolog(self):
373        self._check_queue_entry_statuses(
374                self.queue_entries,
375                allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
376
377        super(ArchiveResultsTask, self).prolog()
378
379
380    def epilog(self):
381        super(ArchiveResultsTask, self).epilog()
382        if not self.success and self._paired_with_monitor().has_process():
383            failed_file = os.path.join(self._working_directory(),
384                                       self._ARCHIVING_FAILED_FILE)
385            paired_process = self._paired_with_monitor().get_process()
386            self._drone_manager.write_lines_to_file(
387                    failed_file, ['Archiving failed with exit code %s'
388                                  % self.monitor.exit_code()],
389                    paired_with_process=paired_process)
390        self._set_all_statuses(self._final_status())
391