agent_task.py revision e6803234ac5233c48ff50b57460d42641e04a52b
1#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9    The Agent is the bridge between the scheduler and the AgentTask. The
10    schedulers tick has a method called handle_agents, which calls the
11    tick of each agent in the Dispatchers queue. This leads to the Agent
12    polling its AgentTask. The scheduler will keep polling a task through
13    the associated Agent till the Agent is removed from the dispatcher.
14
15    At a high level:
16        agents finished = tasks done
17        agent polls till finished
18            task polls till done
19                task sets done
20        agent is removed from dispatcher
21AgentTasks:
22    Basic AgentTasks are created when an hqe changes state. Examples of these
23    are the QueueTask, which is created when a hqe goes into the Starting state
24    and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
26    Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
27    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33    task poll
34        start
35            prolog
36        tick till we get an exit code
37        finished(exit==0)
38            done=True
39            epilog
40                cleanup
41                    set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45       tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47    - prolog: Usually where one puts any model state changes that happen before
48              the actual task. Different per Task. Examples of things that might
49              happen in a prolog:
50                  - state of Host, HQE (to something like Resetting)
51                  - delete any unwanted queued special tasks
52                  - register a pidfile
53                  - set the is_active bit on the special task
54    - run:
55        - create a PidfileRunMonitor
56        - pass the autoserv command, working directory etc to drone manager.
57          This will start the actual autoserv process.
58   - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61        to a tick. The tick monitors the pid file of the autoserv process
62        running on the drone through the PidfileRunMonitor created in prolog.
63        If the autoserv process has finished we call finished with true/false
64        depending on autoserv exit code.
65
66        - finished: sets the done and success values, then calls epilog. The
67                    done bit is important because the Agent polls this bit to
68                    measure the success or failure of its task.
69
70            - epilog: Is generally where we set status of the Host/HQE again,
71                      requeue any other task that needs to run after this one
72                      and perform cleanup. Just like the prolog, this step is
73                      different per task.
74
75                      - cleanup: Sets the is_active and is_complete and success
76                                 states on the tasks model. Also uses the
77                                 drone_manager to:
78                                    unregister the pidfile
79                                    copy results of the task
80                                 (Note this is not to be confused with the
81                                  special task called cleanup).
82
83                      The actions we take in the epilog are based on the
84                      success/failure of the autoserv process set in cleanup,
85                      eg: if reset failed we will enqueue a repair, but if all
86                      is well the epilog will just return. Prejob task epilogs
87                      also have an on_pending method that change the status of
88                      the HQE to pending/starting, which gets picked up in the
89                      scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94      |--->RepairTask
95      |--->PreJobTask
96           |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99      |--->QueueTask
100      |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103      |--->GatherLogsTask
104      |--->SelfThrottledPostJobTask
105            |--->FinalReparseTask
106            |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
115from autotest_lib.client.common_lib import global_config
116from autotest_lib.client.common_lib import utils
117from autotest_lib.client.common_lib.cros.graphite import autotest_stats
118from autotest_lib.frontend.afe import models
119from autotest_lib.scheduler import drone_manager, pidfile_monitor
120from autotest_lib.scheduler import scheduler_lib
121from autotest_lib.scheduler import rdb_lib
122from autotest_lib.scheduler import scheduler_models
123from autotest_lib.server import autoserv_utils
124from autotest_lib.server import system_utils
125
126CONFIG = global_config.global_config
127AUTOSERV_NICE_LEVEL = 10
128
129ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
130        'CROS', 'enable_drone_in_restricted_subnet', type=bool,
131        default=False)
132
133
134class BaseAgentTask(object):
135    class _NullMonitor(object):
136        pidfile_id = None
137
138        def has_process(self):
139            return True
140
141
142    def __init__(self, log_file_name=None):
143        """
144        @param log_file_name: (optional) name of file to log command output to
145        """
146        self._drone_manager = drone_manager.instance()
147        self.done = False
148        self.started = False
149        self.success = None
150        self.aborted = False
151        self.monitor = None
152        self.queue_entry_ids = []
153        self.host_ids = []
154        # A map between host id and hostname.
155        self.hostnames = {}
156        self._log_file_name = log_file_name
157
158
159    def _set_ids(self, host=None, queue_entries=None):
160        if queue_entries and queue_entries != [None]:
161            self.host_ids = [entry.host.id for entry in queue_entries]
162            self.queue_entry_ids = [entry.id for entry in queue_entries]
163            self.hostnames = dict((entry.host.id, entry.host.hostname)
164                                  for entry in queue_entries)
165        else:
166            assert host
167            self.host_ids = [host.id]
168            self.hostnames = {host.id: host.hostname}
169
170
171    def poll(self):
172        if not self.started:
173            self.start()
174        if not self.done:
175            self.tick()
176
177
178    def tick(self):
179        assert self.monitor
180        exit_code = self.monitor.exit_code()
181        if exit_code is None:
182            return
183
184        success = (exit_code == 0)
185        self.finished(success)
186
187
188    def is_done(self):
189        return self.done
190
191
192    def finished(self, success):
193        if self.done:
194            assert self.started
195            return
196        self.started = True
197        self.done = True
198        self.success = success
199        self.epilog()
200
201
202    def prolog(self):
203        """
204        To be overridden.
205        """
206        assert not self.monitor
207        self.register_necessary_pidfiles()
208
209
210    def _log_file(self):
211        if not self._log_file_name:
212            return None
213        return os.path.join(self._working_directory(), self._log_file_name)
214
215
216    def cleanup(self):
217        log_file = self._log_file()
218        if self.monitor and log_file:
219            self.monitor.try_copy_to_results_repository(log_file)
220
221
222    def epilog(self):
223        """
224        To be overridden.
225        """
226        self.cleanup()
227        logging.info("%s finished with success=%s", type(self).__name__,
228                     self.success)
229
230
231    def start(self):
232        if not self.started:
233            self.prolog()
234            self.run()
235
236        self.started = True
237
238
239    def abort(self):
240        if self.monitor:
241            self.monitor.kill()
242        self.done = True
243        self.aborted = True
244        self.cleanup()
245
246
247    def _get_consistent_execution_path(self, execution_entries):
248        first_execution_path = execution_entries[0].execution_path()
249        for execution_entry in execution_entries[1:]:
250            assert execution_entry.execution_path() == first_execution_path, (
251                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
252                                        execution_entry,
253                                        first_execution_path,
254                                        execution_entries[0]))
255        return first_execution_path
256
257
258    def _copy_results(self, execution_entries, use_monitor=None):
259        """
260        @param execution_entries: list of objects with execution_path() method
261        """
262        if use_monitor is not None and not use_monitor.has_process():
263            return
264
265        assert len(execution_entries) > 0
266        if use_monitor is None:
267            assert self.monitor
268            use_monitor = self.monitor
269        assert use_monitor.has_process()
270        execution_path = self._get_consistent_execution_path(execution_entries)
271        results_path = execution_path + '/'
272        use_monitor.try_copy_to_results_repository(results_path)
273
274
275    def _parse_results(self, queue_entries):
276        for queue_entry in queue_entries:
277            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
278
279
280    def _archive_results(self, queue_entries):
281        for queue_entry in queue_entries:
282            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
283
284
285    def _command_line(self):
286        """
287        Return the command line to run.  Must be overridden.
288        """
289        raise NotImplementedError
290
291
292    @property
293    def num_processes(self):
294        """
295        Return the number of processes forked by this BaseAgentTask's process.
296        It may only be approximate.  To be overridden if necessary.
297        """
298        return 1
299
300
301    def _paired_with_monitor(self):
302        """
303        If this BaseAgentTask's process must run on the same machine as some
304        previous process, this method should be overridden to return a
305        PidfileRunMonitor for that process.
306        """
307        return self._NullMonitor()
308
309
310    @property
311    def owner_username(self):
312        """
313        Return login of user responsible for this task.  May be None.  Must be
314        overridden.
315        """
316        raise NotImplementedError
317
318
319    def _working_directory(self):
320        """
321        Return the directory where this BaseAgentTask's process executes.
322        Must be overridden.
323        """
324        raise NotImplementedError
325
326
327    def _pidfile_name(self):
328        """
329        Return the name of the pidfile this BaseAgentTask's process uses.  To be
330        overridden if necessary.
331        """
332        return drone_manager.AUTOSERV_PID_FILE
333
334
335    def _check_paired_results_exist(self):
336        if not self._paired_with_monitor().has_process():
337            metadata = {
338                    '_type': 'scheduler_error',
339                    'error': 'No paired results in task',
340                    'task': str(self),
341                    'pidfile_id': str(self._paired_with_monitor().pidfile_id)}
342            autotest_stats.Counter('no_paired_results_in_task',
343                                   metadata=metadata).increment()
344            self.finished(False)
345            return False
346        return True
347
348
349    def _create_monitor(self):
350        assert not self.monitor
351        self.monitor = pidfile_monitor.PidfileRunMonitor()
352
353
354    def run(self):
355        if not self._check_paired_results_exist():
356            return
357
358        self._create_monitor()
359        self.monitor.run(
360                self._command_line(), self._working_directory(),
361                num_processes=self.num_processes,
362                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
363                pidfile_name=self._pidfile_name(),
364                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
365                username=self.owner_username,
366                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
367
368
369    def get_drone_hostnames_allowed(
370            self, restricted_subnets=utils.RESTRICTED_SUBNETS,
371            enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
372        filtered_drones = None
373        has_unrestricted_host = False
374        if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
375            for hostname in self.hostnames.values():
376                subnet = utils.get_restricted_subnet(hostname,
377                                                     restricted_subnets)
378
379                # Return an empty set if the list of hosts exists both in
380                # restricted and unrestricted subnet. No drone can work in such
381                # case.
382                if ((not subnet and filtered_drones is not None) or
383                    (subnet and has_unrestricted_host)):
384                    logging.error('The test has some DUT in restricted subnet, '
385                                  'but some in unrestricted subnet. Therefore, '
386                                  'no drone is available to run the test.')
387                    return set()
388
389                if not subnet:
390                    has_unrestricted_host = True
391                    continue
392
393                server_ip_map=system_utils.DroneCache.get_drone_ip_map()
394                filtered_drones_for_host = set(
395                        utils.get_servers_in_same_subnet(
396                                subnet[0], subnet[1],
397                                server_ip_map=server_ip_map))
398                logging.info('DUT %s is in restricted subnet, drone can only '
399                             'be chosen from %s', hostname,
400                             filtered_drones_for_host)
401                if filtered_drones is None:
402                    filtered_drones = filtered_drones_for_host
403                else:
404                    filtered_drones = set.intersection(
405                            filtered_drones, filtered_drones_for_host)
406
407                # If filtered_drones is an empty set, that means no drone is
408                # allowed to run the task. This is different fron None, which
409                # means all drones are allowed.
410                if filtered_drones == set():
411                    logging.error('DUT(s) is in restricted subnet, but no '
412                                  'drone is available to run the test.')
413                    return filtered_drones
414
415        # If host is not in restricted subnet, use the unrestricted drones only.
416        if (filtered_drones is None and restricted_subnets and
417            enable_drone_in_subnet):
418            filtered_drones = set(
419                    system_utils.DroneCache.get_unrestricted_drones(
420                            restricted_subnets=restricted_subnets))
421
422        if not models.DroneSet.drone_sets_enabled():
423            return filtered_drones
424
425        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
426        if not hqes:
427            # Only special tasks could be missing host queue entries
428            assert isinstance(self, SpecialAgentTask)
429            return self._user_or_global_default_drone_set(
430                    self.task, self.task.requested_by)
431
432        job_ids = hqes.values_list('job', flat=True).distinct()
433        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
434                                      "span multiple jobs")
435
436        job = models.Job.objects.get(id=job_ids[0])
437        drone_set = job.drone_set
438        if not drone_set:
439            return self._user_or_global_default_drone_set(job, job.user())
440
441        if filtered_drones:
442            return set.intersection(filtered_drones,
443                                    drone_set.get_drone_hostnames())
444        else:
445            return drone_set.get_drone_hostnames()
446
447
448    def _user_or_global_default_drone_set(self, obj_with_owner, user):
449        """
450        Returns the user's default drone set, if present.
451
452        Otherwise, returns the global default drone set.
453        """
454        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
455        if not user:
456            logging.warning('%s had no owner; using default drone set',
457                         obj_with_owner)
458            return default_hostnames
459        if not user.drone_set:
460            logging.warning('User %s has no default drone set, using global '
461                         'default', user.login)
462            return default_hostnames
463        return user.drone_set.get_drone_hostnames()
464
465
466    def register_necessary_pidfiles(self):
467        pidfile_id = self._drone_manager.get_pidfile_id_from(
468                self._working_directory(), self._pidfile_name())
469        self._drone_manager.register_pidfile(pidfile_id)
470
471        paired_pidfile_id = self._paired_with_monitor().pidfile_id
472        if paired_pidfile_id:
473            self._drone_manager.register_pidfile(paired_pidfile_id)
474
475
476    def recover(self):
477        if not self._check_paired_results_exist():
478            return
479
480        self._create_monitor()
481        self.monitor.attach_to_existing_process(
482                self._working_directory(), pidfile_name=self._pidfile_name(),
483                num_processes=self.num_processes)
484        if not self.monitor.has_process():
485            # no process to recover; wait to be started normally
486            self.monitor = None
487            return
488
489        self.started = True
490        logging.info('Recovering process %s for %s at %s',
491                     self.monitor.get_process(), type(self).__name__,
492                     self._working_directory())
493
494
495    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
496                                    allowed_host_statuses=None):
497        class_name = self.__class__.__name__
498        for entry in queue_entries:
499            if entry.status not in allowed_hqe_statuses:
500                raise scheduler_lib.SchedulerError(
501                        '%s attempting to start entry with invalid status %s: '
502                        '%s' % (class_name, entry.status, entry))
503            invalid_host_status = (
504                    allowed_host_statuses is not None
505                    and entry.host.status not in allowed_host_statuses)
506            if invalid_host_status:
507                raise scheduler_lib.SchedulerError(
508                        '%s attempting to start on queue entry with invalid '
509                        'host status %s: %s'
510                        % (class_name, entry.host.status, entry))
511
512
513SiteAgentTask = utils.import_site_class(
514    __file__, 'autotest_lib.scheduler.site_monitor_db',
515    'SiteAgentTask', BaseAgentTask)
516
517class AgentTask(SiteAgentTask):
518    pass
519
520
521class TaskWithJobKeyvals(object):
522    """AgentTask mixin providing functionality to help with job keyval files."""
523    _KEYVAL_FILE = 'keyval'
524    def _format_keyval(self, key, value):
525        return '%s=%s' % (key, value)
526
527
528    def _keyval_path(self):
529        """Subclasses must override this"""
530        raise NotImplementedError
531
532
533    def _write_keyval_after_job(self, field, value):
534        assert self.monitor
535        if not self.monitor.has_process():
536            return
537        self._drone_manager.write_lines_to_file(
538            self._keyval_path(), [self._format_keyval(field, value)],
539            paired_with_process=self.monitor.get_process())
540
541
542    def _job_queued_keyval(self, job):
543        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
544
545
546    def _write_job_finished(self):
547        self._write_keyval_after_job("job_finished", int(time.time()))
548
549
550    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
551        keyval_contents = '\n'.join(self._format_keyval(key, value)
552                                    for key, value in keyval_dict.iteritems())
553        # always end with a newline to allow additional keyvals to be written
554        keyval_contents += '\n'
555        self._drone_manager.attach_file_to_execution(self._working_directory(),
556                                                keyval_contents,
557                                                file_path=keyval_path)
558
559
560    def _write_keyvals_before_job(self, keyval_dict):
561        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
562
563
564    def _write_host_keyvals(self, host):
565        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
566                                   host.hostname)
567        platform, all_labels = host.platform_and_labels()
568        all_labels = [ urllib.quote(label) for label in all_labels ]
569        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
570        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
571
572
573class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
574    """
575    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
576    """
577
578    TASK_TYPE = None
579    host = None
580    queue_entry = None
581
582    def __init__(self, task, extra_command_args):
583        super(SpecialAgentTask, self).__init__()
584
585        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
586
587        self.host = rdb_lib.get_hosts([task.host.id])[0]
588        self.host.dbg_str = 'Task: %s' % str(task)
589        self.queue_entry = None
590        if task.queue_entry:
591            self.queue_entry = scheduler_models.HostQueueEntry(
592                    id=task.queue_entry.id)
593            self.host.dbg_str += self.queue_entry.get_dbg_str()
594
595        self.task = task
596        self._extra_command_args = extra_command_args
597        self.host.metadata = self.get_metadata()
598
599
600    def get_metadata(self):
601        """Get a dictionary that contains task information.
602
603        The return value is a dictionary that includes task information like id,
604        name and related job information. The value will be stored in metadata
605        database.
606        @return: A dictionary containing the task id, name and related job id.
607                 If some attributes are failed to be accessed, an empty
608                 dictionary will be returned, and error will be logged.
609        """
610        try:
611            metadata = {'task_id':self.task.id, 'task_name':self.task.task,
612                        'hostname':self.task.host.hostname}
613            if self.task.queue_entry:
614                job = self.task.queue_entry.job
615                metadata.update(
616                        scheduler_models.get_job_metadata(job))
617            return metadata
618        except AttributeError as e:
619            logging.error('Task has missing attribute: %s', e)
620            return {}
621
622
623    def _keyval_path(self):
624        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
625
626
627    def _command_line(self):
628        return autoserv_utils._autoserv_command_line(self.host.hostname,
629                                                     self._extra_command_args,
630                                                     queue_entry=self.queue_entry,
631                                                     in_lab=True)
632
633
634    def _working_directory(self):
635        return self.task.execution_path()
636
637
638    @property
639    def owner_username(self):
640        if self.task.requested_by:
641            return self.task.requested_by.login
642        return None
643
644
645    def prolog(self):
646        super(SpecialAgentTask, self).prolog()
647        self.task.activate()
648        self._write_host_keyvals(self.host)
649
650
651    def _fail_queue_entry(self):
652        assert self.queue_entry
653
654        if self.queue_entry.meta_host:
655            return # don't fail metahost entries, they'll be reassigned
656
657        self.queue_entry.update_from_database()
658        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
659            return # entry has been aborted
660
661        self._actually_fail_queue_entry()
662
663
664    # TODO(milleral): http://crbug.com/268607
665    # All this used to be a part of _fail_queue_entry.  The
666    # exact semantics of when one should and should not be failing a queue
667    # entry need to be worked out, because provisioning has placed us in a
668    # case where we want to fail a queue entry that could be requeued,
669    # which makes us fail the two above if statements, and thus
670    # _fail_queue_entry() would exit early and have no effect.
671    # What's left here with _actually_fail_queue_entry is a hack to be able to
672    # bypass the checks and unconditionally execute the code.
673    def _actually_fail_queue_entry(self):
674        self.queue_entry.set_execution_subdir()
675        queued_key, queued_time = self._job_queued_keyval(
676            self.queue_entry.job)
677        self._write_keyval_after_job(queued_key, queued_time)
678        self._write_job_finished()
679
680        # copy results logs into the normal place for job results
681        self.monitor.try_copy_results_on_drone(
682                source_path=self._working_directory() + '/',
683                destination_path=self.queue_entry.execution_path() + '/')
684
685        pidfile_id = self._drone_manager.get_pidfile_id_from(
686                self.queue_entry.execution_path(),
687                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
688        self._drone_manager.register_pidfile(pidfile_id)
689
690        if self.queue_entry.job.parse_failed_repair:
691            self._parse_results([self.queue_entry])
692        else:
693            self._archive_results([self.queue_entry])
694
695        # Also fail all other special tasks that have not yet run for this HQE
696        pending_tasks = models.SpecialTask.objects.filter(
697                queue_entry__id=self.queue_entry.id,
698                is_complete=0)
699        for task in pending_tasks:
700            task.finish(False)
701
702
703    def cleanup(self):
704        super(SpecialAgentTask, self).cleanup()
705
706        # We will consider an aborted task to be "Failed"
707        self.task.finish(bool(self.success))
708
709        if self.monitor:
710            if self.monitor.has_process():
711                self._copy_results([self.task])
712            if self.monitor.pidfile_id is not None:
713                self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
714
715
716    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
717        """Remove a type of special task in all tasks, keep last one if needed.
718
719        @param special_task_to_remove: type of special task to be removed, e.g.,
720            models.SpecialTask.Task.VERIFY.
721        @param keep_last_one: True to keep the last special task if its type is
722            the same as of special_task_to_remove.
723
724        """
725        queued_special_tasks = models.SpecialTask.objects.filter(
726            host__id=self.host.id,
727            task=special_task_to_remove,
728            is_active=False, is_complete=False, queue_entry=None)
729        if keep_last_one:
730            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
731        queued_special_tasks.delete()
732
733
734    def _generate_autoserv_label_args(self, task):
735        """
736        @param task: An instance of afe model's SpecialTask.
737        @returns: The list of arguments to pass to autoserv to tell it what the
738                  labels of a job are.
739
740        """
741        labels = {x.name for x in task.queue_entry.job.labels}
742        return ['--job-labels', ','.join(labels)]
743