agent_task.py revision 114e17228efd62ab595690be30cb1e3f26fabebe
1#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9    The Agent is the bridge between the scheduler and the AgentTask. The
10    schedulers tick has a method called handle_agents, which calls the
11    tick of each agent in the Dispatchers queue. This leads to the Agent
12    polling its AgentTask. The scheduler will keep polling a task through
13    the associated Agent till the Agent is removed from the dispatcher.
14
15    At a high level:
16        agents finished = tasks done
17        agent polls till finished
18            task polls till done
19                task sets done
20        agent is removed from dispatcher
21AgentTasks:
22    Basic AgentTasks are created when an hqe changes state. Examples of these
23    are the QueueTask, which is created when a hqe goes into the Starting state
24    and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
26    Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
27    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33    task poll
34        start
35            prolog
36        tick till we get an exit code
37        finished(exit==0)
38            done=True
39            epilog
40                cleanup
41                    set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45       tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47    - prolog: Usually where one puts any model state changes that happen before
48              the actual task. Different per Task. Examples of things that might
49              happen in a prolog:
50                  - state of Host, HQE (to something like Resetting)
51                  - delete any unwanted queued special tasks
52                  - register a pidfile
53                  - set the is_active bit on the special task
54    - run:
55        - create a PidfileRunMonitor
56        - pass the autoserv command, working directory etc to drone manager.
57          This will start the actual autoserv process.
58   - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61        to a tick. The tick monitors the pid file of the autoserv process
62        running on the drone through the PidfileRunMonitor created in prolog.
63        If the autoserv process has finished we call finished with true/false
64        depending on autoserv exit code.
65
66        - finished: sets the done and success values, then calls epilog. The
67                    done bit is important because the Agent polls this bit to
68                    measure the success or failure of its task.
69
70            - epilog: Is generally where we set status of the Host/HQE again,
71                      requeue any other task that needs to run after this one
72                      and perform cleanup. Just like the prolog, this step is
73                      different per task.
74
75                      - cleanup: Sets the is_active and is_complete and success
76                                 states on the tasks model. Also uses the
77                                 drone_manager to:
78                                    unregister the pidfile
79                                    copy results of the task
80                                 (Note this is not to be confused with the
81                                  special task called cleanup).
82
83                      The actions we take in the epilog are based on the
84                      success/failure of the autoserv process set in cleanup,
85                      eg: if reset failed we will enqueue a repair, but if all
86                      is well the epilog will just return. Prejob task epilogs
87                      also have an on_pending method that change the status of
88                      the HQE to pending/starting, which gets picked up in the
89                      scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94      |--->RepairTask
95      |--->PreJobTask
96           |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99      |--->QueueTask
100      |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103      |--->GatherLogsTask
104      |--->SelfThrottledPostJobTask
105            |--->FinalReparseTask
106            |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
115from autotest_lib.client.common_lib import utils
116from autotest_lib.client.common_lib.cros.graphite import autotest_stats
117from autotest_lib.frontend.afe import models
118from autotest_lib.scheduler import drone_manager, pidfile_monitor
119from autotest_lib.scheduler import scheduler_lib
120from autotest_lib.scheduler import rdb_lib
121from autotest_lib.scheduler import scheduler_models
122from autotest_lib.server import autoserv_utils
123from autotest_lib.server import system_utils
124
125
126AUTOSERV_NICE_LEVEL = 10
127
128
129class BaseAgentTask(object):
130    class _NullMonitor(object):
131        pidfile_id = None
132
133        def has_process(self):
134            return True
135
136
137    def __init__(self, log_file_name=None):
138        """
139        @param log_file_name: (optional) name of file to log command output to
140        """
141        self._drone_manager = drone_manager.instance()
142        self.done = False
143        self.started = False
144        self.success = None
145        self.aborted = False
146        self.monitor = None
147        self.queue_entry_ids = []
148        self.host_ids = []
149        # A map between host id and hostname.
150        self.hostnames = {}
151        self._log_file_name = log_file_name
152
153
154    def _set_ids(self, host=None, queue_entries=None):
155        if queue_entries and queue_entries != [None]:
156            self.host_ids = [entry.host.id for entry in queue_entries]
157            self.queue_entry_ids = [entry.id for entry in queue_entries]
158            self.hostnames = dict((entry.host.id, entry.host.hostname)
159                                  for entry in queue_entries)
160        else:
161            assert host
162            self.host_ids = [host.id]
163            self.hostnames = {host.id: host.hostname}
164
165
166    def poll(self):
167        if not self.started:
168            self.start()
169        if not self.done:
170            self.tick()
171
172
173    def tick(self):
174        assert self.monitor
175        exit_code = self.monitor.exit_code()
176        if exit_code is None:
177            return
178
179        success = (exit_code == 0)
180        self.finished(success)
181
182
183    def is_done(self):
184        return self.done
185
186
187    def finished(self, success):
188        if self.done:
189            assert self.started
190            return
191        self.started = True
192        self.done = True
193        self.success = success
194        self.epilog()
195
196
197    def prolog(self):
198        """
199        To be overridden.
200        """
201        assert not self.monitor
202        self.register_necessary_pidfiles()
203
204
205    def _log_file(self):
206        if not self._log_file_name:
207            return None
208        return os.path.join(self._working_directory(), self._log_file_name)
209
210
211    def cleanup(self):
212        log_file = self._log_file()
213        if self.monitor and log_file:
214            self.monitor.try_copy_to_results_repository(log_file)
215
216
217    def epilog(self):
218        """
219        To be overridden.
220        """
221        self.cleanup()
222        logging.info("%s finished with success=%s", type(self).__name__,
223                     self.success)
224
225
226    def start(self):
227        if not self.started:
228            self.prolog()
229            self.run()
230
231        self.started = True
232
233
234    def abort(self):
235        if self.monitor:
236            self.monitor.kill()
237        self.done = True
238        self.aborted = True
239        self.cleanup()
240
241
242    def _get_consistent_execution_path(self, execution_entries):
243        first_execution_path = execution_entries[0].execution_path()
244        for execution_entry in execution_entries[1:]:
245            assert execution_entry.execution_path() == first_execution_path, (
246                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
247                                        execution_entry,
248                                        first_execution_path,
249                                        execution_entries[0]))
250        return first_execution_path
251
252
253    def _copy_results(self, execution_entries, use_monitor=None):
254        """
255        @param execution_entries: list of objects with execution_path() method
256        """
257        if use_monitor is not None and not use_monitor.has_process():
258            return
259
260        assert len(execution_entries) > 0
261        if use_monitor is None:
262            assert self.monitor
263            use_monitor = self.monitor
264        assert use_monitor.has_process()
265        execution_path = self._get_consistent_execution_path(execution_entries)
266        results_path = execution_path + '/'
267        use_monitor.try_copy_to_results_repository(results_path)
268
269
270    def _parse_results(self, queue_entries):
271        for queue_entry in queue_entries:
272            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
273
274
275    def _archive_results(self, queue_entries):
276        for queue_entry in queue_entries:
277            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
278
279
280    def _command_line(self):
281        """
282        Return the command line to run.  Must be overridden.
283        """
284        raise NotImplementedError
285
286
287    @property
288    def num_processes(self):
289        """
290        Return the number of processes forked by this BaseAgentTask's process.
291        It may only be approximate.  To be overridden if necessary.
292        """
293        return 1
294
295
296    def _paired_with_monitor(self):
297        """
298        If this BaseAgentTask's process must run on the same machine as some
299        previous process, this method should be overridden to return a
300        PidfileRunMonitor for that process.
301        """
302        return self._NullMonitor()
303
304
305    @property
306    def owner_username(self):
307        """
308        Return login of user responsible for this task.  May be None.  Must be
309        overridden.
310        """
311        raise NotImplementedError
312
313
314    def _working_directory(self):
315        """
316        Return the directory where this BaseAgentTask's process executes.
317        Must be overridden.
318        """
319        raise NotImplementedError
320
321
322    def _pidfile_name(self):
323        """
324        Return the name of the pidfile this BaseAgentTask's process uses.  To be
325        overridden if necessary.
326        """
327        return drone_manager.AUTOSERV_PID_FILE
328
329
330    def _check_paired_results_exist(self):
331        if not self._paired_with_monitor().has_process():
332            metadata = {
333                    '_type': 'scheduler_error',
334                    'error': 'No paired results in task',
335                    'task': str(self),
336                    'pidfile_id': str(self._paired_with_monitor().pidfile_id)}
337            autotest_stats.Counter('no_paired_results_in_task',
338                                   metadata=metadata).increment()
339            self.finished(False)
340            return False
341        return True
342
343
344    def _create_monitor(self):
345        assert not self.monitor
346        self.monitor = pidfile_monitor.PidfileRunMonitor()
347
348
349    def run(self):
350        if not self._check_paired_results_exist():
351            return
352
353        self._create_monitor()
354        self.monitor.run(
355                self._command_line(), self._working_directory(),
356                num_processes=self.num_processes,
357                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
358                pidfile_name=self._pidfile_name(),
359                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
360                username=self.owner_username,
361                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
362
363
364    def get_drone_hostnames_allowed(
365            self, restricted_subnets=utils.RESTRICTED_SUBNETS):
366        filtered_drones = None
367        has_unrestricted_host = False
368        if self.hostnames and restricted_subnets:
369            for hostname in self.hostnames.values():
370                subnet = utils.get_restricted_subnet(hostname,
371                                                     restricted_subnets)
372
373                # Return an empty set if the list of hosts exists both in
374                # restricted and unrestricted subnet. No drone can work in such
375                # case.
376                if ((not subnet and filtered_drones is not None) or
377                    (subnet and has_unrestricted_host)):
378                    logging.error('The test has some DUT in restricted subnet, '
379                                  'but some in unrestricted subnet. Therefore, '
380                                  'no drone is available to run the test.')
381                    return set()
382
383                if not subnet:
384                    has_unrestricted_host = True
385                    continue
386
387                server_ip_map=system_utils.DroneCache.get_drone_ip_map()
388                filtered_drones_for_host = set(
389                        utils.get_servers_in_same_subnet(
390                                subnet[0], subnet[1],
391                                server_ip_map=server_ip_map))
392                logging.info('DUT %s is in restricted subnet, drone can only '
393                             'be chosen from %s', hostname,
394                             filtered_drones_for_host)
395                if filtered_drones is None:
396                    filtered_drones = filtered_drones_for_host
397                else:
398                    filtered_drones = set.intersection(
399                            filtered_drones, filtered_drones_for_host)
400
401                # If filtered_drones is an empty set, that means no drone is
402                # allowed to run the task. This is different fron None, which
403                # means all drones are allowed.
404                if filtered_drones == set():
405                    logging.error('DUT(s) is in restricted subnet, but no '
406                                  'drone is available to run the test.')
407                    return filtered_drones
408
409        # If host is not in restricted subnet, use the unrestricted drones only.
410        if filtered_drones is None and restricted_subnets:
411            filtered_drones = set(
412                    system_utils.DroneCache.get_unrestricted_drones(
413                            restricted_subnets=restricted_subnets))
414
415        if not models.DroneSet.drone_sets_enabled():
416            return filtered_drones
417
418        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
419        if not hqes:
420            # Only special tasks could be missing host queue entries
421            assert isinstance(self, SpecialAgentTask)
422            return self._user_or_global_default_drone_set(
423                    self.task, self.task.requested_by)
424
425        job_ids = hqes.values_list('job', flat=True).distinct()
426        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
427                                      "span multiple jobs")
428
429        job = models.Job.objects.get(id=job_ids[0])
430        drone_set = job.drone_set
431        if not drone_set:
432            return self._user_or_global_default_drone_set(job, job.user())
433
434        if filtered_drones:
435            return set.intersection(filtered_drones,
436                                    drone_set.get_drone_hostnames())
437        else:
438            return drone_set.get_drone_hostnames()
439
440
441    def _user_or_global_default_drone_set(self, obj_with_owner, user):
442        """
443        Returns the user's default drone set, if present.
444
445        Otherwise, returns the global default drone set.
446        """
447        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
448        if not user:
449            logging.warning('%s had no owner; using default drone set',
450                         obj_with_owner)
451            return default_hostnames
452        if not user.drone_set:
453            logging.warning('User %s has no default drone set, using global '
454                         'default', user.login)
455            return default_hostnames
456        return user.drone_set.get_drone_hostnames()
457
458
459    def register_necessary_pidfiles(self):
460        pidfile_id = self._drone_manager.get_pidfile_id_from(
461                self._working_directory(), self._pidfile_name())
462        self._drone_manager.register_pidfile(pidfile_id)
463
464        paired_pidfile_id = self._paired_with_monitor().pidfile_id
465        if paired_pidfile_id:
466            self._drone_manager.register_pidfile(paired_pidfile_id)
467
468
469    def recover(self):
470        if not self._check_paired_results_exist():
471            return
472
473        self._create_monitor()
474        self.monitor.attach_to_existing_process(
475                self._working_directory(), pidfile_name=self._pidfile_name(),
476                num_processes=self.num_processes)
477        if not self.monitor.has_process():
478            # no process to recover; wait to be started normally
479            self.monitor = None
480            return
481
482        self.started = True
483        logging.info('Recovering process %s for %s at %s',
484                     self.monitor.get_process(), type(self).__name__,
485                     self._working_directory())
486
487
488    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
489                                    allowed_host_statuses=None):
490        class_name = self.__class__.__name__
491        for entry in queue_entries:
492            if entry.status not in allowed_hqe_statuses:
493                raise scheduler_lib.SchedulerError(
494                        '%s attempting to start entry with invalid status %s: '
495                        '%s' % (class_name, entry.status, entry))
496            invalid_host_status = (
497                    allowed_host_statuses is not None
498                    and entry.host.status not in allowed_host_statuses)
499            if invalid_host_status:
500                raise scheduler_lib.SchedulerError(
501                        '%s attempting to start on queue entry with invalid '
502                        'host status %s: %s'
503                        % (class_name, entry.host.status, entry))
504
505
506SiteAgentTask = utils.import_site_class(
507    __file__, 'autotest_lib.scheduler.site_monitor_db',
508    'SiteAgentTask', BaseAgentTask)
509
510class AgentTask(SiteAgentTask):
511    pass
512
513
514class TaskWithJobKeyvals(object):
515    """AgentTask mixin providing functionality to help with job keyval files."""
516    _KEYVAL_FILE = 'keyval'
517    def _format_keyval(self, key, value):
518        return '%s=%s' % (key, value)
519
520
521    def _keyval_path(self):
522        """Subclasses must override this"""
523        raise NotImplementedError
524
525
526    def _write_keyval_after_job(self, field, value):
527        assert self.monitor
528        if not self.monitor.has_process():
529            return
530        self._drone_manager.write_lines_to_file(
531            self._keyval_path(), [self._format_keyval(field, value)],
532            paired_with_process=self.monitor.get_process())
533
534
535    def _job_queued_keyval(self, job):
536        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
537
538
539    def _write_job_finished(self):
540        self._write_keyval_after_job("job_finished", int(time.time()))
541
542
543    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
544        keyval_contents = '\n'.join(self._format_keyval(key, value)
545                                    for key, value in keyval_dict.iteritems())
546        # always end with a newline to allow additional keyvals to be written
547        keyval_contents += '\n'
548        self._drone_manager.attach_file_to_execution(self._working_directory(),
549                                                keyval_contents,
550                                                file_path=keyval_path)
551
552
553    def _write_keyvals_before_job(self, keyval_dict):
554        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
555
556
557    def _write_host_keyvals(self, host):
558        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
559                                   host.hostname)
560        platform, all_labels = host.platform_and_labels()
561        all_labels = [ urllib.quote(label) for label in all_labels ]
562        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
563        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
564
565
566class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
567    """
568    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
569    """
570
571    TASK_TYPE = None
572    host = None
573    queue_entry = None
574
575    def __init__(self, task, extra_command_args):
576        super(SpecialAgentTask, self).__init__()
577
578        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
579
580        self.host = rdb_lib.get_hosts([task.host.id])[0]
581        self.host.dbg_str = 'Task: %s' % str(task)
582        self.queue_entry = None
583        if task.queue_entry:
584            self.queue_entry = scheduler_models.HostQueueEntry(
585                    id=task.queue_entry.id)
586            self.host.dbg_str += self.queue_entry.get_dbg_str()
587
588        self.task = task
589        self._extra_command_args = extra_command_args
590        self.host.metadata = self.get_metadata()
591
592
593    def get_metadata(self):
594        """Get a dictionary that contains task information.
595
596        The return value is a dictionary that includes task information like id,
597        name and related job information. The value will be stored in metadata
598        database.
599        @return: A dictionary containing the task id, name and related job id.
600                 If some attributes are failed to be accessed, an empty
601                 dictionary will be returned, and error will be logged.
602        """
603        try:
604            metadata = {'task_id':self.task.id, 'task_name':self.task.task,
605                        'hostname':self.task.host.hostname}
606            if self.task.queue_entry:
607                job = self.task.queue_entry.job
608                metadata.update(
609                        scheduler_models.get_job_metadata(job))
610            return metadata
611        except AttributeError as e:
612            logging.error('Task has missing attribute: %s', e)
613            return {}
614
615
616    def _keyval_path(self):
617        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
618
619
620    def _command_line(self):
621        return autoserv_utils._autoserv_command_line(self.host.hostname,
622                                                     self._extra_command_args,
623                                                     queue_entry=self.queue_entry,
624                                                     in_lab=True)
625
626
627    def _working_directory(self):
628        return self.task.execution_path()
629
630
631    @property
632    def owner_username(self):
633        if self.task.requested_by:
634            return self.task.requested_by.login
635        return None
636
637
638    def prolog(self):
639        super(SpecialAgentTask, self).prolog()
640        self.task.activate()
641        self._write_host_keyvals(self.host)
642
643
644    def _fail_queue_entry(self):
645        assert self.queue_entry
646
647        if self.queue_entry.meta_host:
648            return # don't fail metahost entries, they'll be reassigned
649
650        self.queue_entry.update_from_database()
651        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
652            return # entry has been aborted
653
654        self._actually_fail_queue_entry()
655
656
657    # TODO(milleral): http://crbug.com/268607
658    # All this used to be a part of _fail_queue_entry.  The
659    # exact semantics of when one should and should not be failing a queue
660    # entry need to be worked out, because provisioning has placed us in a
661    # case where we want to fail a queue entry that could be requeued,
662    # which makes us fail the two above if statements, and thus
663    # _fail_queue_entry() would exit early and have no effect.
664    # What's left here with _actually_fail_queue_entry is a hack to be able to
665    # bypass the checks and unconditionally execute the code.
666    def _actually_fail_queue_entry(self):
667        self.queue_entry.set_execution_subdir()
668        queued_key, queued_time = self._job_queued_keyval(
669            self.queue_entry.job)
670        self._write_keyval_after_job(queued_key, queued_time)
671        self._write_job_finished()
672
673        # copy results logs into the normal place for job results
674        self.monitor.try_copy_results_on_drone(
675                source_path=self._working_directory() + '/',
676                destination_path=self.queue_entry.execution_path() + '/')
677
678        pidfile_id = self._drone_manager.get_pidfile_id_from(
679                self.queue_entry.execution_path(),
680                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
681        self._drone_manager.register_pidfile(pidfile_id)
682
683        if self.queue_entry.job.parse_failed_repair:
684            self._parse_results([self.queue_entry])
685        else:
686            self._archive_results([self.queue_entry])
687
688        # Also fail all other special tasks that have not yet run for this HQE
689        pending_tasks = models.SpecialTask.objects.filter(
690                queue_entry__id=self.queue_entry.id,
691                is_complete=0)
692        for task in pending_tasks:
693            task.finish(False)
694
695
696    def cleanup(self):
697        super(SpecialAgentTask, self).cleanup()
698
699        # We will consider an aborted task to be "Failed"
700        self.task.finish(bool(self.success))
701
702        if self.monitor:
703            if self.monitor.has_process():
704                self._copy_results([self.task])
705            if self.monitor.pidfile_id is not None:
706                self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
707
708
709    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
710        """Remove a type of special task in all tasks, keep last one if needed.
711
712        @param special_task_to_remove: type of special task to be removed, e.g.,
713            models.SpecialTask.Task.VERIFY.
714        @param keep_last_one: True to keep the last special task if its type is
715            the same as of special_task_to_remove.
716
717        """
718        queued_special_tasks = models.SpecialTask.objects.filter(
719            host__id=self.host.id,
720            task=special_task_to_remove,
721            is_active=False, is_complete=False, queue_entry=None)
722        if keep_last_one:
723            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
724        queued_special_tasks.delete()
725
726
727    def _generate_autoserv_label_args(self, task):
728        """
729        @param task: An instance of afe model's SpecialTask.
730        @returns: The list of arguments to pass to autoserv to tell it what the
731                  labels of a job are.
732
733        """
734        labels = {x.name for x in task.queue_entry.job.labels}
735        return ['--job-labels', ','.join(labels)]
736