1#pylint: disable-msg=C0111 2 3""" 4Postjob task. 5 6Postjob tasks are responsible for setting the final status of the HQE 7and Host, and scheduling additional special agents such as cleanup, 8if necessary. 9""" 10 11import os 12 13from autotest_lib.client.common_lib.cros.graphite import autotest_stats 14from autotest_lib.frontend.afe import models, model_attributes 15from autotest_lib.scheduler import agent_task, drones, drone_manager 16from autotest_lib.scheduler import email_manager, pidfile_monitor 17from autotest_lib.scheduler import scheduler_config 18from autotest_lib.server import autoserv_utils 19 20 21_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse') 22 23 24class PostJobTask(agent_task.AgentTask): 25 def __init__(self, queue_entries, log_file_name): 26 super(PostJobTask, self).__init__(log_file_name=log_file_name) 27 28 self.queue_entries = queue_entries 29 30 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor() 31 self._autoserv_monitor.attach_to_existing_process( 32 self._working_directory()) 33 34 35 def _command_line(self): 36 # Do we need testing_mode? 37 return self._generate_command( 38 self._drone_manager.absolute_path(self._working_directory())) 39 40 41 def _generate_command(self, results_dir): 42 raise NotImplementedError('Subclasses must override this') 43 44 45 @property 46 def owner_username(self): 47 return self.queue_entries[0].job.owner 48 49 50 def _working_directory(self): 51 return self._get_consistent_execution_path(self.queue_entries) 52 53 54 def _paired_with_monitor(self): 55 return self._autoserv_monitor 56 57 58 def _job_was_aborted(self): 59 was_aborted = None 60 for queue_entry in self.queue_entries: 61 queue_entry.update_from_database() 62 if was_aborted is None: # first queue entry 63 was_aborted = bool(queue_entry.aborted) 64 elif was_aborted != bool(queue_entry.aborted): # subsequent entries 65 entries = ['%s (aborted: %s)' % (entry, entry.aborted) 66 for entry in self.queue_entries] 67 email_manager.manager.enqueue_notify_email( 68 'Inconsistent abort state', 69 'Queue entries have inconsistent abort state:\n' + 70 '\n'.join(entries)) 71 # don't crash here, just assume true 72 return True 73 return was_aborted 74 75 76 def _final_status(self): 77 if self._job_was_aborted(): 78 return models.HostQueueEntry.Status.ABORTED 79 80 # we'll use a PidfileRunMonitor to read the autoserv exit status 81 if self._autoserv_monitor.exit_code() == 0: 82 return models.HostQueueEntry.Status.COMPLETED 83 return models.HostQueueEntry.Status.FAILED 84 85 86 def _set_all_statuses(self, status): 87 for queue_entry in self.queue_entries: 88 queue_entry.set_status(status) 89 90 91 def abort(self): 92 # override AgentTask.abort() to avoid killing the process and ending 93 # the task. post-job tasks continue when the job is aborted. 94 pass 95 96 97 def _pidfile_label(self): 98 # '.autoserv_execute' -> 'autoserv' 99 return self._pidfile_name()[1:-len('_execute')] 100 101 102class SelfThrottledPostJobTask(PostJobTask): 103 """ 104 PostJobTask that maintains its own process limit. 105 106 We throttle tasks like parsing because we don't want them to 107 hold up tests. At the same time we don't wish to build up load 108 that will take forever to parse. 109 """ 110 _num_running_processes = 0 111 # Last known limit of max processes, used to check whether 112 # max processes config has been changed. 113 _last_known_max_processes = 0 114 # Whether an email should be sent to notifiy process limit being hit. 115 _notification_on = True 116 # Once process limit is hit, an email will be sent. 117 # To prevent spams, do not send another email until 118 # it drops to lower than the following level. 119 REVIVE_NOTIFICATION_THRESHOLD = 0.80 120 121 122 @classmethod 123 def _increment_running_processes(cls): 124 cls._num_running_processes += 1 125 autotest_stats.Gauge('scheduler').send( 126 '%s.num_running_processes' % cls.__name__, 127 cls._num_running_processes) 128 129 130 @classmethod 131 def _decrement_running_processes(cls): 132 cls._num_running_processes -= 1 133 autotest_stats.Gauge('scheduler').send( 134 '%s.num_running_processes' % cls.__name__, 135 cls._num_running_processes) 136 137 138 @classmethod 139 def _max_processes(cls): 140 raise NotImplementedError 141 142 143 @classmethod 144 def _can_run_new_process(cls): 145 return cls._num_running_processes < cls._max_processes() 146 147 148 def _process_started(self): 149 return bool(self.monitor) 150 151 152 def tick(self): 153 # override tick to keep trying to start until the process count goes 154 # down and we can, at which point we revert to default behavior 155 if self._process_started(): 156 super(SelfThrottledPostJobTask, self).tick() 157 else: 158 self._try_starting_process() 159 160 161 def run(self): 162 # override run() to not actually run unless we can 163 self._try_starting_process() 164 165 166 @classmethod 167 def _notify_process_limit_hit(cls): 168 """Send an email to notify that process limit is hit.""" 169 if cls._notification_on: 170 subject = '%s: hitting max process limit.' % cls.__name__ 171 message = ('Running processes/Max processes: %d/%d' 172 % (cls._num_running_processes, cls._max_processes())) 173 email_manager.manager.enqueue_notify_email(subject, message) 174 cls._notification_on = False 175 176 177 @classmethod 178 def _reset_notification_switch_if_necessary(cls): 179 """Reset _notification_on if necessary. 180 181 Set _notification_on to True on the following cases: 182 1) If the limit of max processes configuration changes; 183 2) If _notification_on is False and the number of running processes 184 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD. 185 186 """ 187 if cls._last_known_max_processes != cls._max_processes(): 188 cls._notification_on = True 189 cls._last_known_max_processes = cls._max_processes() 190 return 191 percentage = float(cls._num_running_processes) / cls._max_processes() 192 if (not cls._notification_on and 193 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD): 194 cls._notification_on = True 195 196 197 def _try_starting_process(self): 198 self._reset_notification_switch_if_necessary() 199 if not self._can_run_new_process(): 200 self._notify_process_limit_hit() 201 return 202 203 # actually run the command 204 super(SelfThrottledPostJobTask, self).run() 205 if self._process_started(): 206 self._increment_running_processes() 207 208 209 def finished(self, success): 210 super(SelfThrottledPostJobTask, self).finished(success) 211 if self._process_started(): 212 self._decrement_running_processes() 213 214 215class GatherLogsTask(PostJobTask): 216 """ 217 Task responsible for 218 * gathering uncollected logs (if Autoserv crashed hard or was killed) 219 * copying logs to the results repository 220 * spawning CleanupTasks for hosts, if necessary 221 * spawning a FinalReparseTask for the job 222 * setting the final status of the host, directly or through a cleanup 223 """ 224 def __init__(self, queue_entries, recover_run_monitor=None): 225 self._job = queue_entries[0].job 226 super(GatherLogsTask, self).__init__( 227 queue_entries, log_file_name='.collect_crashinfo.log') 228 self._set_ids(queue_entries=queue_entries) 229 230 231 # TODO: Refactor into autoserv_utils. crbug.com/243090 232 def _generate_command(self, results_dir): 233 host_list = ','.join(queue_entry.host.hostname 234 for queue_entry in self.queue_entries) 235 return [autoserv_utils.autoserv_path , '-p', 236 '--pidfile-label=%s' % self._pidfile_label(), 237 '--use-existing-results', '--collect-crashinfo', 238 '-m', host_list, '-r', results_dir] 239 240 241 @property 242 def num_processes(self): 243 return len(self.queue_entries) 244 245 246 def _pidfile_name(self): 247 return drone_manager.CRASHINFO_PID_FILE 248 249 250 def prolog(self): 251 self._check_queue_entry_statuses( 252 self.queue_entries, 253 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,), 254 allowed_host_statuses=(models.Host.Status.RUNNING,)) 255 256 super(GatherLogsTask, self).prolog() 257 258 259 def epilog(self): 260 super(GatherLogsTask, self).epilog() 261 self._parse_results(self.queue_entries) 262 self._reboot_hosts() 263 264 265 def _reboot_hosts(self): 266 if self._autoserv_monitor.has_process(): 267 final_success = (self._final_status() == 268 models.HostQueueEntry.Status.COMPLETED) 269 num_tests_failed = self._autoserv_monitor.num_tests_failed() 270 else: 271 final_success = False 272 num_tests_failed = 0 273 reboot_after = self._job.reboot_after 274 do_reboot = ( 275 # always reboot after aborted jobs 276 self._final_status() == models.HostQueueEntry.Status.ABORTED 277 or reboot_after == model_attributes.RebootAfter.ALWAYS 278 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED 279 and final_success and num_tests_failed == 0) 280 or num_tests_failed > 0) 281 282 for queue_entry in self.queue_entries: 283 if do_reboot: 284 # don't pass the queue entry to the CleanupTask. if the cleanup 285 # fails, the job doesn't care -- it's over. 286 models.SpecialTask.objects.create( 287 host=models.Host.objects.get(id=queue_entry.host.id), 288 task=models.SpecialTask.Task.CLEANUP, 289 requested_by=self._job.owner_model()) 290 else: 291 queue_entry.host.set_status(models.Host.Status.READY) 292 293 294 def run(self): 295 autoserv_exit_code = self._autoserv_monitor.exit_code() 296 # only run if Autoserv exited due to some signal. if we have no exit 297 # code, assume something bad (and signal-like) happened. 298 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code): 299 super(GatherLogsTask, self).run() 300 else: 301 self.finished(True) 302 303 304class FinalReparseTask(SelfThrottledPostJobTask): 305 def __init__(self, queue_entries): 306 super(FinalReparseTask, self).__init__(queue_entries, 307 log_file_name='.parse.log') 308 # don't use _set_ids, since we don't want to set the host_ids 309 self.queue_entry_ids = [entry.id for entry in queue_entries] 310 311 312 def _generate_command(self, results_dir): 313 return [_parser_path, '--write-pidfile', '--record-duration', 314 '-l', '2', '-r', '-o', results_dir] 315 316 317 @property 318 def num_processes(self): 319 return 0 # don't include parser processes in accounting 320 321 322 def _pidfile_name(self): 323 return drone_manager.PARSER_PID_FILE 324 325 326 @classmethod 327 def _max_processes(cls): 328 return scheduler_config.config.max_parse_processes 329 330 331 def prolog(self): 332 self._check_queue_entry_statuses( 333 self.queue_entries, 334 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,)) 335 336 super(FinalReparseTask, self).prolog() 337 338 339 def epilog(self): 340 super(FinalReparseTask, self).epilog() 341 self._archive_results(self.queue_entries) 342 343 344class ArchiveResultsTask(SelfThrottledPostJobTask): 345 _ARCHIVING_FAILED_FILE = '.archiver_failed' 346 347 def __init__(self, queue_entries): 348 super(ArchiveResultsTask, self).__init__(queue_entries, 349 log_file_name='.archiving.log') 350 # don't use _set_ids, since we don't want to set the host_ids 351 self.queue_entry_ids = [entry.id for entry in queue_entries] 352 353 354 def _pidfile_name(self): 355 return drone_manager.ARCHIVER_PID_FILE 356 357 358 # TODO: Refactor into autoserv_utils. crbug.com/243090 359 def _generate_command(self, results_dir): 360 return [autoserv_utils.autoserv_path , '-p', 361 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir, 362 '--use-existing-results', '--control-filename=control.archive', 363 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler', 364 'archive_results.control.srv')] 365 366 367 @classmethod 368 def _max_processes(cls): 369 return scheduler_config.config.max_transfer_processes 370 371 372 def prolog(self): 373 self._check_queue_entry_statuses( 374 self.queue_entries, 375 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,)) 376 377 super(ArchiveResultsTask, self).prolog() 378 379 380 def epilog(self): 381 super(ArchiveResultsTask, self).epilog() 382 if not self.success and self._paired_with_monitor().has_process(): 383 failed_file = os.path.join(self._working_directory(), 384 self._ARCHIVING_FAILED_FILE) 385 paired_process = self._paired_with_monitor().get_process() 386 self._drone_manager.write_lines_to_file( 387 failed_file, ['Archiving failed with exit code %s' 388 % self.monitor.exit_code()], 389 paired_with_process=paired_process) 390 self._set_all_statuses(self._final_status()) 391