agent_task.py revision cc9fc70587d37775673e47b3dcb4d6ded0c6dcb4
1#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9    The Agent is the bridge between the scheduler and the AgentTask. The
10    schedulers tick has a method called handle_agents, which calls the
11    tick of each agent in the Dispatchers queue. This leads to the Agent
12    polling its AgentTask. The scheduler will keep polling a task through
13    the associated Agent till the Agent is removed from the dispatcher.
14
15    At a high level:
16        agents finished = tasks done
17        agent polls till finished
18            task polls till done
19                task sets done
20        agent is removed from dispatcher
21AgentTasks:
22    Basic AgentTasks are created when an hqe changes state. Examples of these
23    are the QueueTask, which is created when a hqe goes into the Starting state
24    and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
26    Unlink AgentTasks, SpecialAgentTasks are only created when a row is inserted
27    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33    task poll
34        start
35            prolog
36        tick till we get an exit code
37        finished(exit==0)
38            done=True
39            epilog
40                cleanup
41                    set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45       tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47    - prolog: Usually where one puts any model state changes that happen before
48              the actual task. Different per Task. Examples of things that might
49              happen in a prolog:
50                  - state of Host, HQE (to something like Resetting)
51                  - delete any unwanted queued special tasks
52                  - register a pidfile
53                  - set the is_active bit on the special task
54    - run:
55        - create a PidfileRunMonitor
56        - pass the autoserv command, working directory etc to drone manager.
57          This will start the actual autoserv process.
58   - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61        to a tick. The tick monitors the pid file of the autoserv process
62        running on the drone through the PidfileRunMonitor created in prolog.
63        If the autoserv process has finished we call finished with true/false
64        depending on autoserv exit code.
65
66        - finished: sets the done and success values, then calls epilog. The
67                    done bit is important because the Agent polls this bit to
68                    measure the success or failure of its task.
69
70            - epilog: Is generally where we set status of the Host/HQE again,
71                      requeue any other task that needs to run after this one
72                      and perform cleanup. Just like the prolog, this step is
73                      different per task.
74
75                      - cleanup: Sets the is_active and is_complete and success
76                                 states on the tasks model. Also uses the
77                                 drone_manager to:
78                                    unregister the pidfile
79                                    copy results of the task
80                                 (Note this is not to be confused with the
81                                  special task called cleanup).
82
83                      The actions we take in the epilog are based on the
84                      success/failure of the autoserv process set in cleanup,
85                      eg: if reset failed we will enqueue a repair, but if all
86                      is well the epilog will just return. Prejob task epilogs
87                      also have an on_pending method that change the status of
88                      the HQE to pending/starting, which gets picked up in the
89                      scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94      |--->RepairTask
95      |--->PreJobTask
96           |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99      |--->QueueTask
100      |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103      |--->GatherLogsTask
104      |--->SelfThrottledPostJobTask
105            |--->FinalReparseTask
106            |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
115from autotest_lib.frontend.afe import models
116from autotest_lib.scheduler import drone_manager, pidfile_monitor
117from autotest_lib.client.common_lib import utils
118from autotest_lib.scheduler import email_manager, host_scheduler
119from autotest_lib.scheduler import rdb_lib
120from autotest_lib.scheduler import scheduler_models
121from autotest_lib.server import autoserv_utils
122
123
124_drone_manager = drone_manager.instance()
125
126AUTOSERV_NICE_LEVEL = 10
127
128
129class BaseAgentTask(object):
130    class _NullMonitor(object):
131        pidfile_id = None
132
133        def has_process(self):
134            return True
135
136
137    def __init__(self, log_file_name=None):
138        """
139        @param log_file_name: (optional) name of file to log command output to
140        """
141        self.done = False
142        self.started = False
143        self.success = None
144        self.aborted = False
145        self.monitor = None
146        self.queue_entry_ids = []
147        self.host_ids = []
148        self._log_file_name = log_file_name
149
150
151    def _set_ids(self, host=None, queue_entries=None):
152        if queue_entries and queue_entries != [None]:
153            self.host_ids = [entry.host.id for entry in queue_entries]
154            self.queue_entry_ids = [entry.id for entry in queue_entries]
155        else:
156            assert host
157            self.host_ids = [host.id]
158
159
160    def poll(self):
161        if not self.started:
162            self.start()
163        if not self.done:
164            self.tick()
165
166
167    def tick(self):
168        assert self.monitor
169        exit_code = self.monitor.exit_code()
170        if exit_code is None:
171            return
172
173        success = (exit_code == 0)
174        self.finished(success)
175
176
177    def is_done(self):
178        return self.done
179
180
181    def finished(self, success):
182        if self.done:
183            assert self.started
184            return
185        self.started = True
186        self.done = True
187        self.success = success
188        self.epilog()
189
190
191    def prolog(self):
192        """
193        To be overridden.
194        """
195        assert not self.monitor
196        self.register_necessary_pidfiles()
197
198
199    def _log_file(self):
200        if not self._log_file_name:
201            return None
202        return os.path.join(self._working_directory(), self._log_file_name)
203
204
205    def cleanup(self):
206        log_file = self._log_file()
207        if self.monitor and log_file:
208            self.monitor.try_copy_to_results_repository(log_file)
209
210
211    def epilog(self):
212        """
213        To be overridden.
214        """
215        self.cleanup()
216        logging.info("%s finished with success=%s", type(self).__name__,
217                     self.success)
218
219
220    def start(self):
221        if not self.started:
222            self.prolog()
223            self.run()
224
225        self.started = True
226
227
228    def abort(self):
229        if self.monitor:
230            self.monitor.kill()
231        self.done = True
232        self.aborted = True
233        self.cleanup()
234
235
236    def _get_consistent_execution_path(self, execution_entries):
237        first_execution_path = execution_entries[0].execution_path()
238        for execution_entry in execution_entries[1:]:
239            assert execution_entry.execution_path() == first_execution_path, (
240                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
241                                        execution_entry,
242                                        first_execution_path,
243                                        execution_entries[0]))
244        return first_execution_path
245
246
247    def _copy_results(self, execution_entries, use_monitor=None):
248        """
249        @param execution_entries: list of objects with execution_path() method
250        """
251        if use_monitor is not None and not use_monitor.has_process():
252            return
253
254        assert len(execution_entries) > 0
255        if use_monitor is None:
256            assert self.monitor
257            use_monitor = self.monitor
258        assert use_monitor.has_process()
259        execution_path = self._get_consistent_execution_path(execution_entries)
260        results_path = execution_path + '/'
261        use_monitor.try_copy_to_results_repository(results_path)
262
263
264    def _parse_results(self, queue_entries):
265        for queue_entry in queue_entries:
266            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
267
268
269    def _archive_results(self, queue_entries):
270        for queue_entry in queue_entries:
271            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
272
273
274    def _command_line(self):
275        """
276        Return the command line to run.  Must be overridden.
277        """
278        raise NotImplementedError
279
280
281    @property
282    def num_processes(self):
283        """
284        Return the number of processes forked by this BaseAgentTask's process.
285        It may only be approximate.  To be overridden if necessary.
286        """
287        return 1
288
289
290    def _paired_with_monitor(self):
291        """
292        If this BaseAgentTask's process must run on the same machine as some
293        previous process, this method should be overridden to return a
294        PidfileRunMonitor for that process.
295        """
296        return self._NullMonitor()
297
298
299    @property
300    def owner_username(self):
301        """
302        Return login of user responsible for this task.  May be None.  Must be
303        overridden.
304        """
305        raise NotImplementedError
306
307
308    def _working_directory(self):
309        """
310        Return the directory where this BaseAgentTask's process executes.
311        Must be overridden.
312        """
313        raise NotImplementedError
314
315
316    def _pidfile_name(self):
317        """
318        Return the name of the pidfile this BaseAgentTask's process uses.  To be
319        overridden if necessary.
320        """
321        return drone_manager.AUTOSERV_PID_FILE
322
323
324    def _check_paired_results_exist(self):
325        if not self._paired_with_monitor().has_process():
326            email_manager.manager.enqueue_notify_email(
327                    'No paired results in task',
328                    'No paired results in task %s at %s'
329                    % (self, self._paired_with_monitor().pidfile_id))
330            self.finished(False)
331            return False
332        return True
333
334
335    def _create_monitor(self):
336        assert not self.monitor
337        self.monitor = pidfile_monitor.PidfileRunMonitor()
338
339
340    def run(self):
341        if not self._check_paired_results_exist():
342            return
343
344        self._create_monitor()
345        self.monitor.run(
346                self._command_line(), self._working_directory(),
347                num_processes=self.num_processes,
348                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
349                pidfile_name=self._pidfile_name(),
350                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
351                username=self.owner_username,
352                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
353
354
355    def get_drone_hostnames_allowed(self):
356        if not models.DroneSet.drone_sets_enabled():
357            return None
358
359        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
360        if not hqes:
361            # Only special tasks could be missing host queue entries
362            assert isinstance(self, SpecialAgentTask)
363            return self._user_or_global_default_drone_set(
364                    self.task, self.task.requested_by)
365
366        job_ids = hqes.values_list('job', flat=True).distinct()
367        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
368                                      "span multiple jobs")
369
370        job = models.Job.objects.get(id=job_ids[0])
371        drone_set = job.drone_set
372        if not drone_set:
373            return self._user_or_global_default_drone_set(job, job.user())
374
375        return drone_set.get_drone_hostnames()
376
377
378    def _user_or_global_default_drone_set(self, obj_with_owner, user):
379        """
380        Returns the user's default drone set, if present.
381
382        Otherwise, returns the global default drone set.
383        """
384        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
385        if not user:
386            logging.warn('%s had no owner; using default drone set',
387                         obj_with_owner)
388            return default_hostnames
389        if not user.drone_set:
390            logging.warn('User %s has no default drone set, using global '
391                         'default', user.login)
392            return default_hostnames
393        return user.drone_set.get_drone_hostnames()
394
395
396    def register_necessary_pidfiles(self):
397        pidfile_id = _drone_manager.get_pidfile_id_from(
398                self._working_directory(), self._pidfile_name())
399        _drone_manager.register_pidfile(pidfile_id)
400
401        paired_pidfile_id = self._paired_with_monitor().pidfile_id
402        if paired_pidfile_id:
403            _drone_manager.register_pidfile(paired_pidfile_id)
404
405
406    def recover(self):
407        if not self._check_paired_results_exist():
408            return
409
410        self._create_monitor()
411        self.monitor.attach_to_existing_process(
412                self._working_directory(), pidfile_name=self._pidfile_name(),
413                num_processes=self.num_processes)
414        if not self.monitor.has_process():
415            # no process to recover; wait to be started normally
416            self.monitor = None
417            return
418
419        self.started = True
420        logging.info('Recovering process %s for %s at %s',
421                     self.monitor.get_process(), type(self).__name__,
422                     self._working_directory())
423
424
425    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
426                                    allowed_host_statuses=None):
427        class_name = self.__class__.__name__
428        for entry in queue_entries:
429            if entry.status not in allowed_hqe_statuses:
430                raise host_scheduler.SchedulerError(
431                        '%s attempting to start entry with invalid status %s: '
432                        '%s' % (class_name, entry.status, entry))
433            invalid_host_status = (
434                    allowed_host_statuses is not None
435                    and entry.host.status not in allowed_host_statuses)
436            if invalid_host_status:
437                raise host_scheduler.SchedulerError(
438                        '%s attempting to start on queue entry with invalid '
439                        'host status %s: %s'
440                        % (class_name, entry.host.status, entry))
441
442
443SiteAgentTask = utils.import_site_class(
444    __file__, 'autotest_lib.scheduler.site_monitor_db',
445    'SiteAgentTask', BaseAgentTask)
446
447class AgentTask(SiteAgentTask):
448    pass
449
450
451class TaskWithJobKeyvals(object):
452    """AgentTask mixin providing functionality to help with job keyval files."""
453    _KEYVAL_FILE = 'keyval'
454    def _format_keyval(self, key, value):
455        return '%s=%s' % (key, value)
456
457
458    def _keyval_path(self):
459        """Subclasses must override this"""
460        raise NotImplementedError
461
462
463    def _write_keyval_after_job(self, field, value):
464        assert self.monitor
465        if not self.monitor.has_process():
466            return
467        _drone_manager.write_lines_to_file(
468            self._keyval_path(), [self._format_keyval(field, value)],
469            paired_with_process=self.monitor.get_process())
470
471
472    def _job_queued_keyval(self, job):
473        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
474
475
476    def _write_job_finished(self):
477        self._write_keyval_after_job("job_finished", int(time.time()))
478
479
480    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
481        keyval_contents = '\n'.join(self._format_keyval(key, value)
482                                    for key, value in keyval_dict.iteritems())
483        # always end with a newline to allow additional keyvals to be written
484        keyval_contents += '\n'
485        _drone_manager.attach_file_to_execution(self._working_directory(),
486                                                keyval_contents,
487                                                file_path=keyval_path)
488
489
490    def _write_keyvals_before_job(self, keyval_dict):
491        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
492
493
494    def _write_host_keyvals(self, host):
495        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
496                                   host.hostname)
497        platform, all_labels = host.platform_and_labels()
498        all_labels = [ urllib.quote(label) for label in all_labels ]
499        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
500        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
501
502
503class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
504    """
505    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
506    """
507
508    TASK_TYPE = None
509    host = None
510    queue_entry = None
511
512    def __init__(self, task, extra_command_args):
513        super(SpecialAgentTask, self).__init__()
514
515        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
516
517        self.host = rdb_lib.get_hosts([task.host.id])[0]
518        self.host.dbg_str = 'Task: %s' % str(task)
519        self.queue_entry = None
520        if task.queue_entry:
521            self.queue_entry = scheduler_models.HostQueueEntry(
522                    id=task.queue_entry.id)
523            self.host.dbg_str += self.queue_entry.get_dbg_str()
524
525        self.task = task
526        self._extra_command_args = extra_command_args
527
528
529    def _keyval_path(self):
530        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
531
532
533    def _command_line(self):
534        return autoserv_utils._autoserv_command_line(self.host.hostname,
535                                                     self._extra_command_args,
536                                                     queue_entry=self.queue_entry)
537
538
539    def _working_directory(self):
540        return self.task.execution_path()
541
542
543    @property
544    def owner_username(self):
545        if self.task.requested_by:
546            return self.task.requested_by.login
547        return None
548
549
550    def prolog(self):
551        super(SpecialAgentTask, self).prolog()
552        self.task.activate()
553        self._write_host_keyvals(self.host)
554
555
556    def _fail_queue_entry(self):
557        assert self.queue_entry
558
559        if self.queue_entry.meta_host:
560            return # don't fail metahost entries, they'll be reassigned
561
562        self.queue_entry.update_from_database()
563        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
564            return # entry has been aborted
565
566        self._actually_fail_queue_entry()
567
568
569    # TODO(milleral): http://crbug.com/268607
570    # All this used to be a part of _fail_queue_entry.  The
571    # exact semantics of when one should and should not be failing a queue
572    # entry need to be worked out, because provisioning has placed us in a
573    # case where we want to fail a queue entry that could be requeued,
574    # which makes us fail the two above if statements, and thus
575    # _fail_queue_entry() would exit early and have no effect.
576    # What's left here with _actually_fail_queue_entry is a hack to be able to
577    # bypass the checks and unconditionally execute the code.
578    def _actually_fail_queue_entry(self):
579        self.queue_entry.set_execution_subdir()
580        queued_key, queued_time = self._job_queued_keyval(
581            self.queue_entry.job)
582        self._write_keyval_after_job(queued_key, queued_time)
583        self._write_job_finished()
584
585        # copy results logs into the normal place for job results
586        self.monitor.try_copy_results_on_drone(
587                source_path=self._working_directory() + '/',
588                destination_path=self.queue_entry.execution_path() + '/')
589
590        pidfile_id = _drone_manager.get_pidfile_id_from(
591                self.queue_entry.execution_path(),
592                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
593        _drone_manager.register_pidfile(pidfile_id)
594
595        if self.queue_entry.job.parse_failed_repair:
596            self._parse_results([self.queue_entry])
597        else:
598            self._archive_results([self.queue_entry])
599
600        # Also fail all other special tasks that have not yet run for this HQE
601        pending_tasks = models.SpecialTask.objects.filter(
602                queue_entry__id=self.queue_entry.id,
603                is_complete=0)
604        for task in pending_tasks:
605            task.finish(False)
606
607
608    def cleanup(self):
609        super(SpecialAgentTask, self).cleanup()
610
611        # We will consider an aborted task to be "Failed"
612        self.task.finish(bool(self.success))
613
614        if self.monitor:
615            if self.monitor.has_process():
616                self._copy_results([self.task])
617            if self.monitor.pidfile_id is not None:
618                _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
619
620
621    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
622        """Remove a type of special task in all tasks, keep last one if needed.
623
624        @param special_task_to_remove: type of special task to be removed, e.g.,
625            models.SpecialTask.Task.VERIFY.
626        @param keep_last_one: True to keep the last special task if its type is
627            the same as of special_task_to_remove.
628
629        """
630        queued_special_tasks = models.SpecialTask.objects.filter(
631            host__id=self.host.id,
632            task=special_task_to_remove,
633            is_active=False, is_complete=False, queue_entry=None)
634        if keep_last_one:
635            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
636        queued_special_tasks.delete()
637