agent_task.py revision 114e17228efd62ab595690be30cb1e3f26fabebe
1#pylint: disable-msg=C0111 2 3""" This is the module for everything related to the AgentTask. 4 5The BaseAgentTask imposes an interface through which the scheduler can monitor 6a processes; Examples of such processes include Verify, Cleanup and the Queue 7Tasks that run the tests. The scheduler itself only understands Agents. 8Agents: 9 The Agent is the bridge between the scheduler and the AgentTask. The 10 schedulers tick has a method called handle_agents, which calls the 11 tick of each agent in the Dispatchers queue. This leads to the Agent 12 polling its AgentTask. The scheduler will keep polling a task through 13 the associated Agent till the Agent is removed from the dispatcher. 14 15 At a high level: 16 agents finished = tasks done 17 agent polls till finished 18 task polls till done 19 task sets done 20 agent is removed from dispatcher 21AgentTasks: 22 Basic AgentTasks are created when an hqe changes state. Examples of these 23 are the QueueTask, which is created when a hqe goes into the Starting state 24 and the FinalReparseTask, which is created when the hqe goes into parsing. 25SpecialAgentTasks: 26 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted 27 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks. 28 29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps 30an AgentTask to an Agent, which the scheduler understands. From this point 31onward, the scheduler manages the task through the Agents interface,as follows: 32At a high level: 33 task poll 34 start 35 prolog 36 tick till we get an exit code 37 finished(exit==0) 38 done=True 39 epilog 40 cleanup 41 set is_active, is_complete, success (checked in scheduler) 42 43The first special task for an HQE is usually Reset. 44-poll: The first poll will start the task, polls thereafter will call the tasks 45 tick method. A started task will have the started bit set. 46- start: Call prolog, run the process and set the start bit. 47 - prolog: Usually where one puts any model state changes that happen before 48 the actual task. Different per Task. Examples of things that might 49 happen in a prolog: 50 - state of Host, HQE (to something like Resetting) 51 - delete any unwanted queued special tasks 52 - register a pidfile 53 - set the is_active bit on the special task 54 - run: 55 - create a PidfileRunMonitor 56 - pass the autoserv command, working directory etc to drone manager. 57 This will start the actual autoserv process. 58 - set the start bit: so subsequent polls do not 'start' again 59 60- tick: For as long as a started tasks done bit is not set, a poll will lead 61 to a tick. The tick monitors the pid file of the autoserv process 62 running on the drone through the PidfileRunMonitor created in prolog. 63 If the autoserv process has finished we call finished with true/false 64 depending on autoserv exit code. 65 66 - finished: sets the done and success values, then calls epilog. The 67 done bit is important because the Agent polls this bit to 68 measure the success or failure of its task. 69 70 - epilog: Is generally where we set status of the Host/HQE again, 71 requeue any other task that needs to run after this one 72 and perform cleanup. Just like the prolog, this step is 73 different per task. 74 75 - cleanup: Sets the is_active and is_complete and success 76 states on the tasks model. Also uses the 77 drone_manager to: 78 unregister the pidfile 79 copy results of the task 80 (Note this is not to be confused with the 81 special task called cleanup). 82 83 The actions we take in the epilog are based on the 84 success/failure of the autoserv process set in cleanup, 85 eg: if reset failed we will enqueue a repair, but if all 86 is well the epilog will just return. Prejob task epilogs 87 also have an on_pending method that change the status of 88 the HQE to pending/starting, which gets picked up in the 89 scheduler. 90By this point the is_done flag is set, which results in the Agent noticing that 91the task has finished and unregistering it from the dispatcher.Class hierarchy: 92BaseAgentTask 93 |--->SpecialAgentTask (prejob_task.py) 94 |--->RepairTask 95 |--->PreJobTask 96 |--->Verify, Cleanup, Reset, Provision 97 98 |--->AbstractQueueTask (monitor_db.py) 99 |--->QueueTask 100 |--->HostlessQueueTask 101 102 |--->PostJobTask (postjob_task.py) 103 |--->GatherLogsTask 104 |--->SelfThrottledPostJobTask 105 |--->FinalReparseTask 106 |--->ArchiveResultsTask 107 108""" 109 110import logging 111import os 112import urllib 113import time 114 115from autotest_lib.client.common_lib import utils 116from autotest_lib.client.common_lib.cros.graphite import autotest_stats 117from autotest_lib.frontend.afe import models 118from autotest_lib.scheduler import drone_manager, pidfile_monitor 119from autotest_lib.scheduler import scheduler_lib 120from autotest_lib.scheduler import rdb_lib 121from autotest_lib.scheduler import scheduler_models 122from autotest_lib.server import autoserv_utils 123from autotest_lib.server import system_utils 124 125 126AUTOSERV_NICE_LEVEL = 10 127 128 129class BaseAgentTask(object): 130 class _NullMonitor(object): 131 pidfile_id = None 132 133 def has_process(self): 134 return True 135 136 137 def __init__(self, log_file_name=None): 138 """ 139 @param log_file_name: (optional) name of file to log command output to 140 """ 141 self._drone_manager = drone_manager.instance() 142 self.done = False 143 self.started = False 144 self.success = None 145 self.aborted = False 146 self.monitor = None 147 self.queue_entry_ids = [] 148 self.host_ids = [] 149 # A map between host id and hostname. 150 self.hostnames = {} 151 self._log_file_name = log_file_name 152 153 154 def _set_ids(self, host=None, queue_entries=None): 155 if queue_entries and queue_entries != [None]: 156 self.host_ids = [entry.host.id for entry in queue_entries] 157 self.queue_entry_ids = [entry.id for entry in queue_entries] 158 self.hostnames = dict((entry.host.id, entry.host.hostname) 159 for entry in queue_entries) 160 else: 161 assert host 162 self.host_ids = [host.id] 163 self.hostnames = {host.id: host.hostname} 164 165 166 def poll(self): 167 if not self.started: 168 self.start() 169 if not self.done: 170 self.tick() 171 172 173 def tick(self): 174 assert self.monitor 175 exit_code = self.monitor.exit_code() 176 if exit_code is None: 177 return 178 179 success = (exit_code == 0) 180 self.finished(success) 181 182 183 def is_done(self): 184 return self.done 185 186 187 def finished(self, success): 188 if self.done: 189 assert self.started 190 return 191 self.started = True 192 self.done = True 193 self.success = success 194 self.epilog() 195 196 197 def prolog(self): 198 """ 199 To be overridden. 200 """ 201 assert not self.monitor 202 self.register_necessary_pidfiles() 203 204 205 def _log_file(self): 206 if not self._log_file_name: 207 return None 208 return os.path.join(self._working_directory(), self._log_file_name) 209 210 211 def cleanup(self): 212 log_file = self._log_file() 213 if self.monitor and log_file: 214 self.monitor.try_copy_to_results_repository(log_file) 215 216 217 def epilog(self): 218 """ 219 To be overridden. 220 """ 221 self.cleanup() 222 logging.info("%s finished with success=%s", type(self).__name__, 223 self.success) 224 225 226 def start(self): 227 if not self.started: 228 self.prolog() 229 self.run() 230 231 self.started = True 232 233 234 def abort(self): 235 if self.monitor: 236 self.monitor.kill() 237 self.done = True 238 self.aborted = True 239 self.cleanup() 240 241 242 def _get_consistent_execution_path(self, execution_entries): 243 first_execution_path = execution_entries[0].execution_path() 244 for execution_entry in execution_entries[1:]: 245 assert execution_entry.execution_path() == first_execution_path, ( 246 '%s (%s) != %s (%s)' % (execution_entry.execution_path(), 247 execution_entry, 248 first_execution_path, 249 execution_entries[0])) 250 return first_execution_path 251 252 253 def _copy_results(self, execution_entries, use_monitor=None): 254 """ 255 @param execution_entries: list of objects with execution_path() method 256 """ 257 if use_monitor is not None and not use_monitor.has_process(): 258 return 259 260 assert len(execution_entries) > 0 261 if use_monitor is None: 262 assert self.monitor 263 use_monitor = self.monitor 264 assert use_monitor.has_process() 265 execution_path = self._get_consistent_execution_path(execution_entries) 266 results_path = execution_path + '/' 267 use_monitor.try_copy_to_results_repository(results_path) 268 269 270 def _parse_results(self, queue_entries): 271 for queue_entry in queue_entries: 272 queue_entry.set_status(models.HostQueueEntry.Status.PARSING) 273 274 275 def _archive_results(self, queue_entries): 276 for queue_entry in queue_entries: 277 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING) 278 279 280 def _command_line(self): 281 """ 282 Return the command line to run. Must be overridden. 283 """ 284 raise NotImplementedError 285 286 287 @property 288 def num_processes(self): 289 """ 290 Return the number of processes forked by this BaseAgentTask's process. 291 It may only be approximate. To be overridden if necessary. 292 """ 293 return 1 294 295 296 def _paired_with_monitor(self): 297 """ 298 If this BaseAgentTask's process must run on the same machine as some 299 previous process, this method should be overridden to return a 300 PidfileRunMonitor for that process. 301 """ 302 return self._NullMonitor() 303 304 305 @property 306 def owner_username(self): 307 """ 308 Return login of user responsible for this task. May be None. Must be 309 overridden. 310 """ 311 raise NotImplementedError 312 313 314 def _working_directory(self): 315 """ 316 Return the directory where this BaseAgentTask's process executes. 317 Must be overridden. 318 """ 319 raise NotImplementedError 320 321 322 def _pidfile_name(self): 323 """ 324 Return the name of the pidfile this BaseAgentTask's process uses. To be 325 overridden if necessary. 326 """ 327 return drone_manager.AUTOSERV_PID_FILE 328 329 330 def _check_paired_results_exist(self): 331 if not self._paired_with_monitor().has_process(): 332 metadata = { 333 '_type': 'scheduler_error', 334 'error': 'No paired results in task', 335 'task': str(self), 336 'pidfile_id': str(self._paired_with_monitor().pidfile_id)} 337 autotest_stats.Counter('no_paired_results_in_task', 338 metadata=metadata).increment() 339 self.finished(False) 340 return False 341 return True 342 343 344 def _create_monitor(self): 345 assert not self.monitor 346 self.monitor = pidfile_monitor.PidfileRunMonitor() 347 348 349 def run(self): 350 if not self._check_paired_results_exist(): 351 return 352 353 self._create_monitor() 354 self.monitor.run( 355 self._command_line(), self._working_directory(), 356 num_processes=self.num_processes, 357 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(), 358 pidfile_name=self._pidfile_name(), 359 paired_with_pidfile=self._paired_with_monitor().pidfile_id, 360 username=self.owner_username, 361 drone_hostnames_allowed=self.get_drone_hostnames_allowed()) 362 363 364 def get_drone_hostnames_allowed( 365 self, restricted_subnets=utils.RESTRICTED_SUBNETS): 366 filtered_drones = None 367 has_unrestricted_host = False 368 if self.hostnames and restricted_subnets: 369 for hostname in self.hostnames.values(): 370 subnet = utils.get_restricted_subnet(hostname, 371 restricted_subnets) 372 373 # Return an empty set if the list of hosts exists both in 374 # restricted and unrestricted subnet. No drone can work in such 375 # case. 376 if ((not subnet and filtered_drones is not None) or 377 (subnet and has_unrestricted_host)): 378 logging.error('The test has some DUT in restricted subnet, ' 379 'but some in unrestricted subnet. Therefore, ' 380 'no drone is available to run the test.') 381 return set() 382 383 if not subnet: 384 has_unrestricted_host = True 385 continue 386 387 server_ip_map=system_utils.DroneCache.get_drone_ip_map() 388 filtered_drones_for_host = set( 389 utils.get_servers_in_same_subnet( 390 subnet[0], subnet[1], 391 server_ip_map=server_ip_map)) 392 logging.info('DUT %s is in restricted subnet, drone can only ' 393 'be chosen from %s', hostname, 394 filtered_drones_for_host) 395 if filtered_drones is None: 396 filtered_drones = filtered_drones_for_host 397 else: 398 filtered_drones = set.intersection( 399 filtered_drones, filtered_drones_for_host) 400 401 # If filtered_drones is an empty set, that means no drone is 402 # allowed to run the task. This is different fron None, which 403 # means all drones are allowed. 404 if filtered_drones == set(): 405 logging.error('DUT(s) is in restricted subnet, but no ' 406 'drone is available to run the test.') 407 return filtered_drones 408 409 # If host is not in restricted subnet, use the unrestricted drones only. 410 if filtered_drones is None and restricted_subnets: 411 filtered_drones = set( 412 system_utils.DroneCache.get_unrestricted_drones( 413 restricted_subnets=restricted_subnets)) 414 415 if not models.DroneSet.drone_sets_enabled(): 416 return filtered_drones 417 418 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids) 419 if not hqes: 420 # Only special tasks could be missing host queue entries 421 assert isinstance(self, SpecialAgentTask) 422 return self._user_or_global_default_drone_set( 423 self.task, self.task.requested_by) 424 425 job_ids = hqes.values_list('job', flat=True).distinct() 426 assert job_ids.count() == 1, ("BaseAgentTask's queue entries " 427 "span multiple jobs") 428 429 job = models.Job.objects.get(id=job_ids[0]) 430 drone_set = job.drone_set 431 if not drone_set: 432 return self._user_or_global_default_drone_set(job, job.user()) 433 434 if filtered_drones: 435 return set.intersection(filtered_drones, 436 drone_set.get_drone_hostnames()) 437 else: 438 return drone_set.get_drone_hostnames() 439 440 441 def _user_or_global_default_drone_set(self, obj_with_owner, user): 442 """ 443 Returns the user's default drone set, if present. 444 445 Otherwise, returns the global default drone set. 446 """ 447 default_hostnames = models.DroneSet.get_default().get_drone_hostnames() 448 if not user: 449 logging.warning('%s had no owner; using default drone set', 450 obj_with_owner) 451 return default_hostnames 452 if not user.drone_set: 453 logging.warning('User %s has no default drone set, using global ' 454 'default', user.login) 455 return default_hostnames 456 return user.drone_set.get_drone_hostnames() 457 458 459 def register_necessary_pidfiles(self): 460 pidfile_id = self._drone_manager.get_pidfile_id_from( 461 self._working_directory(), self._pidfile_name()) 462 self._drone_manager.register_pidfile(pidfile_id) 463 464 paired_pidfile_id = self._paired_with_monitor().pidfile_id 465 if paired_pidfile_id: 466 self._drone_manager.register_pidfile(paired_pidfile_id) 467 468 469 def recover(self): 470 if not self._check_paired_results_exist(): 471 return 472 473 self._create_monitor() 474 self.monitor.attach_to_existing_process( 475 self._working_directory(), pidfile_name=self._pidfile_name(), 476 num_processes=self.num_processes) 477 if not self.monitor.has_process(): 478 # no process to recover; wait to be started normally 479 self.monitor = None 480 return 481 482 self.started = True 483 logging.info('Recovering process %s for %s at %s', 484 self.monitor.get_process(), type(self).__name__, 485 self._working_directory()) 486 487 488 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 489 allowed_host_statuses=None): 490 class_name = self.__class__.__name__ 491 for entry in queue_entries: 492 if entry.status not in allowed_hqe_statuses: 493 raise scheduler_lib.SchedulerError( 494 '%s attempting to start entry with invalid status %s: ' 495 '%s' % (class_name, entry.status, entry)) 496 invalid_host_status = ( 497 allowed_host_statuses is not None 498 and entry.host.status not in allowed_host_statuses) 499 if invalid_host_status: 500 raise scheduler_lib.SchedulerError( 501 '%s attempting to start on queue entry with invalid ' 502 'host status %s: %s' 503 % (class_name, entry.host.status, entry)) 504 505 506SiteAgentTask = utils.import_site_class( 507 __file__, 'autotest_lib.scheduler.site_monitor_db', 508 'SiteAgentTask', BaseAgentTask) 509 510class AgentTask(SiteAgentTask): 511 pass 512 513 514class TaskWithJobKeyvals(object): 515 """AgentTask mixin providing functionality to help with job keyval files.""" 516 _KEYVAL_FILE = 'keyval' 517 def _format_keyval(self, key, value): 518 return '%s=%s' % (key, value) 519 520 521 def _keyval_path(self): 522 """Subclasses must override this""" 523 raise NotImplementedError 524 525 526 def _write_keyval_after_job(self, field, value): 527 assert self.monitor 528 if not self.monitor.has_process(): 529 return 530 self._drone_manager.write_lines_to_file( 531 self._keyval_path(), [self._format_keyval(field, value)], 532 paired_with_process=self.monitor.get_process()) 533 534 535 def _job_queued_keyval(self, job): 536 return 'job_queued', int(time.mktime(job.created_on.timetuple())) 537 538 539 def _write_job_finished(self): 540 self._write_keyval_after_job("job_finished", int(time.time())) 541 542 543 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path): 544 keyval_contents = '\n'.join(self._format_keyval(key, value) 545 for key, value in keyval_dict.iteritems()) 546 # always end with a newline to allow additional keyvals to be written 547 keyval_contents += '\n' 548 self._drone_manager.attach_file_to_execution(self._working_directory(), 549 keyval_contents, 550 file_path=keyval_path) 551 552 553 def _write_keyvals_before_job(self, keyval_dict): 554 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path()) 555 556 557 def _write_host_keyvals(self, host): 558 keyval_path = os.path.join(self._working_directory(), 'host_keyvals', 559 host.hostname) 560 platform, all_labels = host.platform_and_labels() 561 all_labels = [ urllib.quote(label) for label in all_labels ] 562 keyval_dict = dict(platform=platform, labels=','.join(all_labels)) 563 self._write_keyvals_before_job_helper(keyval_dict, keyval_path) 564 565 566class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): 567 """ 568 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. 569 """ 570 571 TASK_TYPE = None 572 host = None 573 queue_entry = None 574 575 def __init__(self, task, extra_command_args): 576 super(SpecialAgentTask, self).__init__() 577 578 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden' 579 580 self.host = rdb_lib.get_hosts([task.host.id])[0] 581 self.host.dbg_str = 'Task: %s' % str(task) 582 self.queue_entry = None 583 if task.queue_entry: 584 self.queue_entry = scheduler_models.HostQueueEntry( 585 id=task.queue_entry.id) 586 self.host.dbg_str += self.queue_entry.get_dbg_str() 587 588 self.task = task 589 self._extra_command_args = extra_command_args 590 self.host.metadata = self.get_metadata() 591 592 593 def get_metadata(self): 594 """Get a dictionary that contains task information. 595 596 The return value is a dictionary that includes task information like id, 597 name and related job information. The value will be stored in metadata 598 database. 599 @return: A dictionary containing the task id, name and related job id. 600 If some attributes are failed to be accessed, an empty 601 dictionary will be returned, and error will be logged. 602 """ 603 try: 604 metadata = {'task_id':self.task.id, 'task_name':self.task.task, 605 'hostname':self.task.host.hostname} 606 if self.task.queue_entry: 607 job = self.task.queue_entry.job 608 metadata.update( 609 scheduler_models.get_job_metadata(job)) 610 return metadata 611 except AttributeError as e: 612 logging.error('Task has missing attribute: %s', e) 613 return {} 614 615 616 def _keyval_path(self): 617 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 618 619 620 def _command_line(self): 621 return autoserv_utils._autoserv_command_line(self.host.hostname, 622 self._extra_command_args, 623 queue_entry=self.queue_entry, 624 in_lab=True) 625 626 627 def _working_directory(self): 628 return self.task.execution_path() 629 630 631 @property 632 def owner_username(self): 633 if self.task.requested_by: 634 return self.task.requested_by.login 635 return None 636 637 638 def prolog(self): 639 super(SpecialAgentTask, self).prolog() 640 self.task.activate() 641 self._write_host_keyvals(self.host) 642 643 644 def _fail_queue_entry(self): 645 assert self.queue_entry 646 647 if self.queue_entry.meta_host: 648 return # don't fail metahost entries, they'll be reassigned 649 650 self.queue_entry.update_from_database() 651 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: 652 return # entry has been aborted 653 654 self._actually_fail_queue_entry() 655 656 657 # TODO(milleral): http://crbug.com/268607 658 # All this used to be a part of _fail_queue_entry. The 659 # exact semantics of when one should and should not be failing a queue 660 # entry need to be worked out, because provisioning has placed us in a 661 # case where we want to fail a queue entry that could be requeued, 662 # which makes us fail the two above if statements, and thus 663 # _fail_queue_entry() would exit early and have no effect. 664 # What's left here with _actually_fail_queue_entry is a hack to be able to 665 # bypass the checks and unconditionally execute the code. 666 def _actually_fail_queue_entry(self): 667 self.queue_entry.set_execution_subdir() 668 queued_key, queued_time = self._job_queued_keyval( 669 self.queue_entry.job) 670 self._write_keyval_after_job(queued_key, queued_time) 671 self._write_job_finished() 672 673 # copy results logs into the normal place for job results 674 self.monitor.try_copy_results_on_drone( 675 source_path=self._working_directory() + '/', 676 destination_path=self.queue_entry.execution_path() + '/') 677 678 pidfile_id = self._drone_manager.get_pidfile_id_from( 679 self.queue_entry.execution_path(), 680 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 681 self._drone_manager.register_pidfile(pidfile_id) 682 683 if self.queue_entry.job.parse_failed_repair: 684 self._parse_results([self.queue_entry]) 685 else: 686 self._archive_results([self.queue_entry]) 687 688 # Also fail all other special tasks that have not yet run for this HQE 689 pending_tasks = models.SpecialTask.objects.filter( 690 queue_entry__id=self.queue_entry.id, 691 is_complete=0) 692 for task in pending_tasks: 693 task.finish(False) 694 695 696 def cleanup(self): 697 super(SpecialAgentTask, self).cleanup() 698 699 # We will consider an aborted task to be "Failed" 700 self.task.finish(bool(self.success)) 701 702 if self.monitor: 703 if self.monitor.has_process(): 704 self._copy_results([self.task]) 705 if self.monitor.pidfile_id is not None: 706 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id) 707 708 709 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False): 710 """Remove a type of special task in all tasks, keep last one if needed. 711 712 @param special_task_to_remove: type of special task to be removed, e.g., 713 models.SpecialTask.Task.VERIFY. 714 @param keep_last_one: True to keep the last special task if its type is 715 the same as of special_task_to_remove. 716 717 """ 718 queued_special_tasks = models.SpecialTask.objects.filter( 719 host__id=self.host.id, 720 task=special_task_to_remove, 721 is_active=False, is_complete=False, queue_entry=None) 722 if keep_last_one: 723 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id) 724 queued_special_tasks.delete() 725 726 727 def _generate_autoserv_label_args(self, task): 728 """ 729 @param task: An instance of afe model's SpecialTask. 730 @returns: The list of arguments to pass to autoserv to tell it what the 731 labels of a job are. 732 733 """ 734 labels = {x.name for x in task.queue_entry.job.labels} 735 return ['--job-labels', ','.join(labels)] 736