agent_task.py revision cc9fc70587d37775673e47b3dcb4d6ded0c6dcb4
1#pylint: disable-msg=C0111 2 3""" This is the module for everything related to the AgentTask. 4 5The BaseAgentTask imposes an interface through which the scheduler can monitor 6a processes; Examples of such processes include Verify, Cleanup and the Queue 7Tasks that run the tests. The scheduler itself only understands Agents. 8Agents: 9 The Agent is the bridge between the scheduler and the AgentTask. The 10 schedulers tick has a method called handle_agents, which calls the 11 tick of each agent in the Dispatchers queue. This leads to the Agent 12 polling its AgentTask. The scheduler will keep polling a task through 13 the associated Agent till the Agent is removed from the dispatcher. 14 15 At a high level: 16 agents finished = tasks done 17 agent polls till finished 18 task polls till done 19 task sets done 20 agent is removed from dispatcher 21AgentTasks: 22 Basic AgentTasks are created when an hqe changes state. Examples of these 23 are the QueueTask, which is created when a hqe goes into the Starting state 24 and the FinalReparseTask, which is created when the hqe goes into parsing. 25SpecialAgentTasks: 26 Unlink AgentTasks, SpecialAgentTasks are only created when a row is inserted 27 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks. 28 29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps 30an AgentTask to an Agent, which the scheduler understands. From this point 31onward, the scheduler manages the task through the Agents interface,as follows: 32At a high level: 33 task poll 34 start 35 prolog 36 tick till we get an exit code 37 finished(exit==0) 38 done=True 39 epilog 40 cleanup 41 set is_active, is_complete, success (checked in scheduler) 42 43The first special task for an HQE is usually Reset. 44-poll: The first poll will start the task, polls thereafter will call the tasks 45 tick method. A started task will have the started bit set. 46- start: Call prolog, run the process and set the start bit. 47 - prolog: Usually where one puts any model state changes that happen before 48 the actual task. Different per Task. Examples of things that might 49 happen in a prolog: 50 - state of Host, HQE (to something like Resetting) 51 - delete any unwanted queued special tasks 52 - register a pidfile 53 - set the is_active bit on the special task 54 - run: 55 - create a PidfileRunMonitor 56 - pass the autoserv command, working directory etc to drone manager. 57 This will start the actual autoserv process. 58 - set the start bit: so subsequent polls do not 'start' again 59 60- tick: For as long as a started tasks done bit is not set, a poll will lead 61 to a tick. The tick monitors the pid file of the autoserv process 62 running on the drone through the PidfileRunMonitor created in prolog. 63 If the autoserv process has finished we call finished with true/false 64 depending on autoserv exit code. 65 66 - finished: sets the done and success values, then calls epilog. The 67 done bit is important because the Agent polls this bit to 68 measure the success or failure of its task. 69 70 - epilog: Is generally where we set status of the Host/HQE again, 71 requeue any other task that needs to run after this one 72 and perform cleanup. Just like the prolog, this step is 73 different per task. 74 75 - cleanup: Sets the is_active and is_complete and success 76 states on the tasks model. Also uses the 77 drone_manager to: 78 unregister the pidfile 79 copy results of the task 80 (Note this is not to be confused with the 81 special task called cleanup). 82 83 The actions we take in the epilog are based on the 84 success/failure of the autoserv process set in cleanup, 85 eg: if reset failed we will enqueue a repair, but if all 86 is well the epilog will just return. Prejob task epilogs 87 also have an on_pending method that change the status of 88 the HQE to pending/starting, which gets picked up in the 89 scheduler. 90By this point the is_done flag is set, which results in the Agent noticing that 91the task has finished and unregistering it from the dispatcher.Class hierarchy: 92BaseAgentTask 93 |--->SpecialAgentTask (prejob_task.py) 94 |--->RepairTask 95 |--->PreJobTask 96 |--->Verify, Cleanup, Reset, Provision 97 98 |--->AbstractQueueTask (monitor_db.py) 99 |--->QueueTask 100 |--->HostlessQueueTask 101 102 |--->PostJobTask (postjob_task.py) 103 |--->GatherLogsTask 104 |--->SelfThrottledPostJobTask 105 |--->FinalReparseTask 106 |--->ArchiveResultsTask 107 108""" 109 110import logging 111import os 112import urllib 113import time 114 115from autotest_lib.frontend.afe import models 116from autotest_lib.scheduler import drone_manager, pidfile_monitor 117from autotest_lib.client.common_lib import utils 118from autotest_lib.scheduler import email_manager, host_scheduler 119from autotest_lib.scheduler import rdb_lib 120from autotest_lib.scheduler import scheduler_models 121from autotest_lib.server import autoserv_utils 122 123 124_drone_manager = drone_manager.instance() 125 126AUTOSERV_NICE_LEVEL = 10 127 128 129class BaseAgentTask(object): 130 class _NullMonitor(object): 131 pidfile_id = None 132 133 def has_process(self): 134 return True 135 136 137 def __init__(self, log_file_name=None): 138 """ 139 @param log_file_name: (optional) name of file to log command output to 140 """ 141 self.done = False 142 self.started = False 143 self.success = None 144 self.aborted = False 145 self.monitor = None 146 self.queue_entry_ids = [] 147 self.host_ids = [] 148 self._log_file_name = log_file_name 149 150 151 def _set_ids(self, host=None, queue_entries=None): 152 if queue_entries and queue_entries != [None]: 153 self.host_ids = [entry.host.id for entry in queue_entries] 154 self.queue_entry_ids = [entry.id for entry in queue_entries] 155 else: 156 assert host 157 self.host_ids = [host.id] 158 159 160 def poll(self): 161 if not self.started: 162 self.start() 163 if not self.done: 164 self.tick() 165 166 167 def tick(self): 168 assert self.monitor 169 exit_code = self.monitor.exit_code() 170 if exit_code is None: 171 return 172 173 success = (exit_code == 0) 174 self.finished(success) 175 176 177 def is_done(self): 178 return self.done 179 180 181 def finished(self, success): 182 if self.done: 183 assert self.started 184 return 185 self.started = True 186 self.done = True 187 self.success = success 188 self.epilog() 189 190 191 def prolog(self): 192 """ 193 To be overridden. 194 """ 195 assert not self.monitor 196 self.register_necessary_pidfiles() 197 198 199 def _log_file(self): 200 if not self._log_file_name: 201 return None 202 return os.path.join(self._working_directory(), self._log_file_name) 203 204 205 def cleanup(self): 206 log_file = self._log_file() 207 if self.monitor and log_file: 208 self.monitor.try_copy_to_results_repository(log_file) 209 210 211 def epilog(self): 212 """ 213 To be overridden. 214 """ 215 self.cleanup() 216 logging.info("%s finished with success=%s", type(self).__name__, 217 self.success) 218 219 220 def start(self): 221 if not self.started: 222 self.prolog() 223 self.run() 224 225 self.started = True 226 227 228 def abort(self): 229 if self.monitor: 230 self.monitor.kill() 231 self.done = True 232 self.aborted = True 233 self.cleanup() 234 235 236 def _get_consistent_execution_path(self, execution_entries): 237 first_execution_path = execution_entries[0].execution_path() 238 for execution_entry in execution_entries[1:]: 239 assert execution_entry.execution_path() == first_execution_path, ( 240 '%s (%s) != %s (%s)' % (execution_entry.execution_path(), 241 execution_entry, 242 first_execution_path, 243 execution_entries[0])) 244 return first_execution_path 245 246 247 def _copy_results(self, execution_entries, use_monitor=None): 248 """ 249 @param execution_entries: list of objects with execution_path() method 250 """ 251 if use_monitor is not None and not use_monitor.has_process(): 252 return 253 254 assert len(execution_entries) > 0 255 if use_monitor is None: 256 assert self.monitor 257 use_monitor = self.monitor 258 assert use_monitor.has_process() 259 execution_path = self._get_consistent_execution_path(execution_entries) 260 results_path = execution_path + '/' 261 use_monitor.try_copy_to_results_repository(results_path) 262 263 264 def _parse_results(self, queue_entries): 265 for queue_entry in queue_entries: 266 queue_entry.set_status(models.HostQueueEntry.Status.PARSING) 267 268 269 def _archive_results(self, queue_entries): 270 for queue_entry in queue_entries: 271 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING) 272 273 274 def _command_line(self): 275 """ 276 Return the command line to run. Must be overridden. 277 """ 278 raise NotImplementedError 279 280 281 @property 282 def num_processes(self): 283 """ 284 Return the number of processes forked by this BaseAgentTask's process. 285 It may only be approximate. To be overridden if necessary. 286 """ 287 return 1 288 289 290 def _paired_with_monitor(self): 291 """ 292 If this BaseAgentTask's process must run on the same machine as some 293 previous process, this method should be overridden to return a 294 PidfileRunMonitor for that process. 295 """ 296 return self._NullMonitor() 297 298 299 @property 300 def owner_username(self): 301 """ 302 Return login of user responsible for this task. May be None. Must be 303 overridden. 304 """ 305 raise NotImplementedError 306 307 308 def _working_directory(self): 309 """ 310 Return the directory where this BaseAgentTask's process executes. 311 Must be overridden. 312 """ 313 raise NotImplementedError 314 315 316 def _pidfile_name(self): 317 """ 318 Return the name of the pidfile this BaseAgentTask's process uses. To be 319 overridden if necessary. 320 """ 321 return drone_manager.AUTOSERV_PID_FILE 322 323 324 def _check_paired_results_exist(self): 325 if not self._paired_with_monitor().has_process(): 326 email_manager.manager.enqueue_notify_email( 327 'No paired results in task', 328 'No paired results in task %s at %s' 329 % (self, self._paired_with_monitor().pidfile_id)) 330 self.finished(False) 331 return False 332 return True 333 334 335 def _create_monitor(self): 336 assert not self.monitor 337 self.monitor = pidfile_monitor.PidfileRunMonitor() 338 339 340 def run(self): 341 if not self._check_paired_results_exist(): 342 return 343 344 self._create_monitor() 345 self.monitor.run( 346 self._command_line(), self._working_directory(), 347 num_processes=self.num_processes, 348 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(), 349 pidfile_name=self._pidfile_name(), 350 paired_with_pidfile=self._paired_with_monitor().pidfile_id, 351 username=self.owner_username, 352 drone_hostnames_allowed=self.get_drone_hostnames_allowed()) 353 354 355 def get_drone_hostnames_allowed(self): 356 if not models.DroneSet.drone_sets_enabled(): 357 return None 358 359 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids) 360 if not hqes: 361 # Only special tasks could be missing host queue entries 362 assert isinstance(self, SpecialAgentTask) 363 return self._user_or_global_default_drone_set( 364 self.task, self.task.requested_by) 365 366 job_ids = hqes.values_list('job', flat=True).distinct() 367 assert job_ids.count() == 1, ("BaseAgentTask's queue entries " 368 "span multiple jobs") 369 370 job = models.Job.objects.get(id=job_ids[0]) 371 drone_set = job.drone_set 372 if not drone_set: 373 return self._user_or_global_default_drone_set(job, job.user()) 374 375 return drone_set.get_drone_hostnames() 376 377 378 def _user_or_global_default_drone_set(self, obj_with_owner, user): 379 """ 380 Returns the user's default drone set, if present. 381 382 Otherwise, returns the global default drone set. 383 """ 384 default_hostnames = models.DroneSet.get_default().get_drone_hostnames() 385 if not user: 386 logging.warn('%s had no owner; using default drone set', 387 obj_with_owner) 388 return default_hostnames 389 if not user.drone_set: 390 logging.warn('User %s has no default drone set, using global ' 391 'default', user.login) 392 return default_hostnames 393 return user.drone_set.get_drone_hostnames() 394 395 396 def register_necessary_pidfiles(self): 397 pidfile_id = _drone_manager.get_pidfile_id_from( 398 self._working_directory(), self._pidfile_name()) 399 _drone_manager.register_pidfile(pidfile_id) 400 401 paired_pidfile_id = self._paired_with_monitor().pidfile_id 402 if paired_pidfile_id: 403 _drone_manager.register_pidfile(paired_pidfile_id) 404 405 406 def recover(self): 407 if not self._check_paired_results_exist(): 408 return 409 410 self._create_monitor() 411 self.monitor.attach_to_existing_process( 412 self._working_directory(), pidfile_name=self._pidfile_name(), 413 num_processes=self.num_processes) 414 if not self.monitor.has_process(): 415 # no process to recover; wait to be started normally 416 self.monitor = None 417 return 418 419 self.started = True 420 logging.info('Recovering process %s for %s at %s', 421 self.monitor.get_process(), type(self).__name__, 422 self._working_directory()) 423 424 425 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses, 426 allowed_host_statuses=None): 427 class_name = self.__class__.__name__ 428 for entry in queue_entries: 429 if entry.status not in allowed_hqe_statuses: 430 raise host_scheduler.SchedulerError( 431 '%s attempting to start entry with invalid status %s: ' 432 '%s' % (class_name, entry.status, entry)) 433 invalid_host_status = ( 434 allowed_host_statuses is not None 435 and entry.host.status not in allowed_host_statuses) 436 if invalid_host_status: 437 raise host_scheduler.SchedulerError( 438 '%s attempting to start on queue entry with invalid ' 439 'host status %s: %s' 440 % (class_name, entry.host.status, entry)) 441 442 443SiteAgentTask = utils.import_site_class( 444 __file__, 'autotest_lib.scheduler.site_monitor_db', 445 'SiteAgentTask', BaseAgentTask) 446 447class AgentTask(SiteAgentTask): 448 pass 449 450 451class TaskWithJobKeyvals(object): 452 """AgentTask mixin providing functionality to help with job keyval files.""" 453 _KEYVAL_FILE = 'keyval' 454 def _format_keyval(self, key, value): 455 return '%s=%s' % (key, value) 456 457 458 def _keyval_path(self): 459 """Subclasses must override this""" 460 raise NotImplementedError 461 462 463 def _write_keyval_after_job(self, field, value): 464 assert self.monitor 465 if not self.monitor.has_process(): 466 return 467 _drone_manager.write_lines_to_file( 468 self._keyval_path(), [self._format_keyval(field, value)], 469 paired_with_process=self.monitor.get_process()) 470 471 472 def _job_queued_keyval(self, job): 473 return 'job_queued', int(time.mktime(job.created_on.timetuple())) 474 475 476 def _write_job_finished(self): 477 self._write_keyval_after_job("job_finished", int(time.time())) 478 479 480 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path): 481 keyval_contents = '\n'.join(self._format_keyval(key, value) 482 for key, value in keyval_dict.iteritems()) 483 # always end with a newline to allow additional keyvals to be written 484 keyval_contents += '\n' 485 _drone_manager.attach_file_to_execution(self._working_directory(), 486 keyval_contents, 487 file_path=keyval_path) 488 489 490 def _write_keyvals_before_job(self, keyval_dict): 491 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path()) 492 493 494 def _write_host_keyvals(self, host): 495 keyval_path = os.path.join(self._working_directory(), 'host_keyvals', 496 host.hostname) 497 platform, all_labels = host.platform_and_labels() 498 all_labels = [ urllib.quote(label) for label in all_labels ] 499 keyval_dict = dict(platform=platform, labels=','.join(all_labels)) 500 self._write_keyvals_before_job_helper(keyval_dict, keyval_path) 501 502 503class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): 504 """ 505 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. 506 """ 507 508 TASK_TYPE = None 509 host = None 510 queue_entry = None 511 512 def __init__(self, task, extra_command_args): 513 super(SpecialAgentTask, self).__init__() 514 515 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden' 516 517 self.host = rdb_lib.get_hosts([task.host.id])[0] 518 self.host.dbg_str = 'Task: %s' % str(task) 519 self.queue_entry = None 520 if task.queue_entry: 521 self.queue_entry = scheduler_models.HostQueueEntry( 522 id=task.queue_entry.id) 523 self.host.dbg_str += self.queue_entry.get_dbg_str() 524 525 self.task = task 526 self._extra_command_args = extra_command_args 527 528 529 def _keyval_path(self): 530 return os.path.join(self._working_directory(), self._KEYVAL_FILE) 531 532 533 def _command_line(self): 534 return autoserv_utils._autoserv_command_line(self.host.hostname, 535 self._extra_command_args, 536 queue_entry=self.queue_entry) 537 538 539 def _working_directory(self): 540 return self.task.execution_path() 541 542 543 @property 544 def owner_username(self): 545 if self.task.requested_by: 546 return self.task.requested_by.login 547 return None 548 549 550 def prolog(self): 551 super(SpecialAgentTask, self).prolog() 552 self.task.activate() 553 self._write_host_keyvals(self.host) 554 555 556 def _fail_queue_entry(self): 557 assert self.queue_entry 558 559 if self.queue_entry.meta_host: 560 return # don't fail metahost entries, they'll be reassigned 561 562 self.queue_entry.update_from_database() 563 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: 564 return # entry has been aborted 565 566 self._actually_fail_queue_entry() 567 568 569 # TODO(milleral): http://crbug.com/268607 570 # All this used to be a part of _fail_queue_entry. The 571 # exact semantics of when one should and should not be failing a queue 572 # entry need to be worked out, because provisioning has placed us in a 573 # case where we want to fail a queue entry that could be requeued, 574 # which makes us fail the two above if statements, and thus 575 # _fail_queue_entry() would exit early and have no effect. 576 # What's left here with _actually_fail_queue_entry is a hack to be able to 577 # bypass the checks and unconditionally execute the code. 578 def _actually_fail_queue_entry(self): 579 self.queue_entry.set_execution_subdir() 580 queued_key, queued_time = self._job_queued_keyval( 581 self.queue_entry.job) 582 self._write_keyval_after_job(queued_key, queued_time) 583 self._write_job_finished() 584 585 # copy results logs into the normal place for job results 586 self.monitor.try_copy_results_on_drone( 587 source_path=self._working_directory() + '/', 588 destination_path=self.queue_entry.execution_path() + '/') 589 590 pidfile_id = _drone_manager.get_pidfile_id_from( 591 self.queue_entry.execution_path(), 592 pidfile_name=drone_manager.AUTOSERV_PID_FILE) 593 _drone_manager.register_pidfile(pidfile_id) 594 595 if self.queue_entry.job.parse_failed_repair: 596 self._parse_results([self.queue_entry]) 597 else: 598 self._archive_results([self.queue_entry]) 599 600 # Also fail all other special tasks that have not yet run for this HQE 601 pending_tasks = models.SpecialTask.objects.filter( 602 queue_entry__id=self.queue_entry.id, 603 is_complete=0) 604 for task in pending_tasks: 605 task.finish(False) 606 607 608 def cleanup(self): 609 super(SpecialAgentTask, self).cleanup() 610 611 # We will consider an aborted task to be "Failed" 612 self.task.finish(bool(self.success)) 613 614 if self.monitor: 615 if self.monitor.has_process(): 616 self._copy_results([self.task]) 617 if self.monitor.pidfile_id is not None: 618 _drone_manager.unregister_pidfile(self.monitor.pidfile_id) 619 620 621 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False): 622 """Remove a type of special task in all tasks, keep last one if needed. 623 624 @param special_task_to_remove: type of special task to be removed, e.g., 625 models.SpecialTask.Task.VERIFY. 626 @param keep_last_one: True to keep the last special task if its type is 627 the same as of special_task_to_remove. 628 629 """ 630 queued_special_tasks = models.SpecialTask.objects.filter( 631 host__id=self.host.id, 632 task=special_task_to_remove, 633 is_active=False, is_complete=False, queue_entry=None) 634 if keep_last_one: 635 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id) 636 queued_special_tasks.delete() 637