experiment_runner.py revision c87cb38b44bdaba67d1415386fcc7ff6a97e3365
1#!/usr/bin/python 2 3# Copyright 2011 Google Inc. All Rights Reserved. 4 5"""The experiment runner module.""" 6import getpass 7import os 8import random 9import shutil 10import sys 11import time 12import traceback 13 14import afe_lock_machine 15from machine_image_manager import MachineImageManager 16 17from collections import defaultdict 18from utils import command_executer 19from utils import logger 20from utils.email_sender import EmailSender 21from utils.file_utils import FileUtils 22from threading import Lock 23from threading import Thread 24 25import config 26from experiment_status import ExperimentStatus 27from results_cache import CacheConditions 28from results_cache import ResultsCache 29from results_report import HTMLResultsReport 30from results_report import TextResultsReport 31from results_report import JSONResultsReport 32 33 34class ExperimentRunner(object): 35 """ExperimentRunner Class.""" 36 37 STATUS_TIME_DELAY = 30 38 THREAD_MONITOR_DELAY = 2 39 40 def __init__(self, experiment, json_report, using_schedv2=False, log=None, 41 cmd_exec=None): 42 self._experiment = experiment 43 self.l = log or logger.GetLogger(experiment.log_dir) 44 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 45 self._terminated = False 46 self.json_report = json_report 47 self.locked_machines = [] 48 if experiment.log_level != "verbose": 49 self.STATUS_TIME_DELAY = 10 50 51 # Setting this to True will use crosperf sched v2 (feature in progress). 52 self._using_schedv2 = using_schedv2 53 54 def _GetMachineList(self): 55 """Return a list of all requested machines. 56 57 Create a list of all the requested machines, both global requests and 58 label-specific requests, and return the list. 59 """ 60 machines = self._experiment.remote 61 for l in self._experiment.labels: 62 if l.remote: 63 machines += l.remote 64 return machines 65 66 def _UpdateMachineList(self, locked_machines): 67 """Update machines lists to contain only locked machines. 68 69 Go through all the lists of requested machines, both global and 70 label-specific requests, and remove any machine that we were not 71 able to lock. 72 73 Args: 74 locked_machines: A list of the machines we successfully locked. 75 """ 76 for m in self._experiment.remote: 77 if m not in locked_machines: 78 self._experiment.remote.remove(m) 79 80 for l in self._experiment.labels: 81 for m in l.remote: 82 if m not in locked_machines: 83 l.remote.remove(m) 84 85 def _LockAllMachines(self, experiment): 86 """Attempt to globally lock all of the machines requested for run. 87 88 This method will use the AFE server to globally lock all of the machines 89 requested for this crosperf run, to prevent any other crosperf runs from 90 being able to update/use the machines while this experiment is running. 91 """ 92 lock_mgr = afe_lock_machine.AFELockManager( 93 self._GetMachineList(), 94 "", 95 experiment.labels[0].chromeos_root, 96 None, 97 log=self.l, 98 ) 99 for m in lock_mgr.machines: 100 if not lock_mgr.MachineIsKnown(m): 101 lock_mgr.AddLocalMachine(m) 102 machine_states = lock_mgr.GetMachineStates("lock") 103 lock_mgr.CheckMachineLocks(machine_states, "lock") 104 self.locked_machines = lock_mgr.UpdateMachines(True) 105 self._experiment.locked_machines = self.locked_machines 106 self._UpdateMachineList(self.locked_machines) 107 self._experiment.machine_manager.RemoveNonLockedMachines( 108 self.locked_machines) 109 if len(self.locked_machines) == 0: 110 raise RuntimeError("Unable to lock any machines.") 111 112 def _UnlockAllMachines(self, experiment): 113 """Attempt to globally unlock all of the machines requested for run. 114 115 The method will use the AFE server to globally unlock all of the machines 116 requested for this crosperf run. 117 """ 118 if not self.locked_machines: 119 return 120 121 lock_mgr = afe_lock_machine.AFELockManager( 122 self.locked_machines, 123 "", 124 experiment.labels[0].chromeos_root, 125 None, 126 log=self.l, 127 ) 128 machine_states = lock_mgr.GetMachineStates("unlock") 129 lock_mgr.CheckMachineLocks(machine_states, "unlock") 130 lock_mgr.UpdateMachines(False) 131 132 def _ClearCacheEntries(self, experiment): 133 for br in experiment.benchmark_runs: 134 cache = ResultsCache() 135 cache.Init (br.label.chromeos_image, br.label.chromeos_root, 136 br.benchmark.test_name, br.iteration, br.test_args, 137 br.profiler_args, br.machine_manager, br.machine, 138 br.label.board, br.cache_conditions, br._logger, br.log_level, 139 br.label, br.share_cache, br.benchmark.suite, 140 br.benchmark.show_all_results, br.benchmark.run_local) 141 cache_dir = cache._GetCacheDirForWrite() 142 self.l.LogOutput("Removing cache dir: %s" % cache_dir) 143 shutil.rmtree(cache_dir) 144 145 def _Run(self, experiment): 146 try: 147 if not experiment.locks_dir: 148 self._LockAllMachines(experiment) 149 if self._using_schedv2: 150 schedv2 = Schedv2(experiment) 151 experiment.set_schedv2(schedv2) 152 if CacheConditions.FALSE in experiment.cache_conditions: 153 self._ClearCacheEntries(experiment) 154 status = ExperimentStatus(experiment) 155 experiment.Run() 156 last_status_time = 0 157 last_status_string = "" 158 try: 159 if experiment.log_level != "verbose": 160 self.l.LogStartDots() 161 while not experiment.IsComplete(): 162 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 163 last_status_time = time.time() 164 border = "==============================" 165 if experiment.log_level == "verbose": 166 self.l.LogOutput(border) 167 self.l.LogOutput(status.GetProgressString()) 168 self.l.LogOutput(status.GetStatusString()) 169 self.l.LogOutput(border) 170 else: 171 current_status_string = status.GetStatusString() 172 if (current_status_string != last_status_string): 173 self.l.LogEndDots() 174 self.l.LogOutput(border) 175 self.l.LogOutput(current_status_string) 176 self.l.LogOutput(border) 177 last_status_string = current_status_string 178 else: 179 self.l.LogAppendDot() 180 time.sleep(self.THREAD_MONITOR_DELAY) 181 except KeyboardInterrupt: 182 self._terminated = True 183 self.l.LogError("Ctrl-c pressed. Cleaning up...") 184 experiment.Terminate() 185 finally: 186 if not experiment.locks_dir: 187 self._UnlockAllMachines(experiment) 188 189 def _PrintTable(self, experiment): 190 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 191 192 def _Email(self, experiment): 193 # Only email by default if a new run was completed. 194 send_mail = False 195 for benchmark_run in experiment.benchmark_runs: 196 if not benchmark_run.cache_hit: 197 send_mail = True 198 break 199 if (not send_mail and not experiment.email_to 200 or config.GetConfig("no_email")): 201 return 202 203 label_names = [] 204 for label in experiment.labels: 205 label_names.append(label.name) 206 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 207 208 text_report = TextResultsReport(experiment, True).GetReport() 209 text_report += ("\nResults are stored in %s.\n" % 210 experiment.results_directory) 211 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 212 html_report = HTMLResultsReport(experiment).GetReport() 213 attachment = EmailSender.Attachment("report.html", html_report) 214 email_to = [getpass.getuser()] + experiment.email_to 215 EmailSender().SendEmail(email_to, 216 subject, 217 text_report, 218 attachments=[attachment], 219 msg_type="html") 220 221 def _StoreResults (self, experiment): 222 if self._terminated: 223 return 224 results_directory = experiment.results_directory 225 FileUtils().RmDir(results_directory) 226 FileUtils().MkDirP(results_directory) 227 self.l.LogOutput("Storing experiment file in %s." % results_directory) 228 experiment_file_path = os.path.join(results_directory, 229 "experiment.exp") 230 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 231 232 self.l.LogOutput("Storing results report in %s." % results_directory) 233 results_table_path = os.path.join(results_directory, "results.html") 234 report = HTMLResultsReport(experiment).GetReport() 235 if self.json_report: 236 JSONResultsReport(experiment).GetReport(results_directory) 237 FileUtils().WriteFile(results_table_path, report) 238 239 self.l.LogOutput("Storing email message body in %s." % results_directory) 240 msg_file_path = os.path.join(results_directory, "msg_body.html") 241 text_report = TextResultsReport(experiment, True).GetReport() 242 text_report += ("\nResults are stored in %s.\n" % 243 experiment.results_directory) 244 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 245 FileUtils().WriteFile(msg_file_path, msg_body) 246 247 self.l.LogOutput("Storing results of each benchmark run.") 248 for benchmark_run in experiment.benchmark_runs: 249 if benchmark_run.result: 250 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 251 benchmark_run_path = os.path.join(results_directory, 252 benchmark_run_name) 253 benchmark_run.result.CopyResultsTo(benchmark_run_path) 254 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 255 256 def Run(self): 257 self._Run(self._experiment) 258 self._PrintTable(self._experiment) 259 if not self._terminated: 260 self._StoreResults(self._experiment) 261 self._Email(self._experiment) 262 263class DutWorker(Thread): 264 265 def __init__(self, dut, sched): 266 super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) 267 self._dut = dut 268 self._sched = sched 269 self._stat_num_br_run = 0 270 self._stat_num_reimage = 0 271 self._stat_annotation = "" 272 self._l = logger.GetLogger(self._sched._experiment.log_dir) 273 self.daemon = True 274 self._terminated = False 275 self._active_br = None 276 # Race condition accessing _active_br between _execute_benchmark_run and 277 # _terminate, so lock it up. 278 self._active_br_lock = Lock() 279 280 def terminate(self): 281 self._terminated = True 282 with self._active_br_lock: 283 if self._active_br is not None: 284 # BenchmarkRun.Terminate() terminates any running testcase via 285 # suite_runner.Terminate and updates timeline. 286 self._active_br.Terminate() 287 288 def run(self): 289 """Do the "run-test->(optionally reimage)->run-test" chore. 290 291 Note - 'br' below means 'benchmark_run'. 292 """ 293 294 self._setup_dut_label() 295 try: 296 self._l.LogOutput("{} started.".format(self)) 297 while not self._terminated: 298 br = self._sched.get_benchmark_run(self._dut) 299 if br is None: 300 # No br left for this label. Considering reimaging. 301 label = self._sched.allocate_label(self._dut) 302 if label is None: 303 # No br even for other labels. We are done. 304 self._l.LogOutput("ImageManager found no label " 305 "for dut, stopping working " 306 "thread {}.".format(self)) 307 break 308 if self._reimage(label): 309 # Reimage to run other br fails, dut is doomed, stop 310 # this thread. 311 self._l.LogWarning("Re-image failed, dut " 312 "in an unstable state, stopping " 313 "working thread {}.".format(self)) 314 break 315 else: 316 # Execute the br. 317 self._execute_benchmark_run(br) 318 finally: 319 self._stat_annotation = "finished" 320 # Thread finishes. Notify scheduler that I'm done. 321 self._sched.dut_worker_finished(self) 322 323 def _reimage(self, label): 324 """Reimage image to label. 325 326 Args: 327 label: the label to remimage onto dut. 328 329 Returns: 330 0 if successful, otherwise 1. 331 """ 332 333 # Termination could happen anywhere, check it. 334 if self._terminated: 335 return 1 336 337 self._l.LogOutput('Reimaging {} using {}'.format(self, label)) 338 self._stat_num_reimage += 1 339 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 340 try: 341 # Note, only 1 reimage at any given time, this is guaranteed in 342 # ImageMachine, so no sync needed below. 343 retval = self._sched._experiment.machine_manager.ImageMachine( 344 self._dut, label) 345 if retval: 346 return 1 347 except: 348 return 1 349 350 self._dut.label = label 351 return 0 352 353 def _execute_benchmark_run(self, br): 354 """Execute a single benchmark_run. 355 356 Note - this function never throws exceptions. 357 """ 358 359 # Termination could happen anywhere, check it. 360 if self._terminated: 361 return 362 363 self._l.LogOutput('{} started working on {}'.format(self, br)) 364 self._stat_num_br_run += 1 365 self._stat_annotation = 'executing {}'.format(br) 366 # benchmark_run.run does not throws, but just play it safe here. 367 try: 368 assert br.owner_thread is None 369 br.owner_thread = self 370 with self._active_br_lock: 371 self._active_br = br 372 br.run() 373 finally: 374 self._sched._experiment.BenchmarkRunFinished(br) 375 with self._active_br_lock: 376 self._active_br = None 377 378 def _setup_dut_label(self): 379 """Try to match dut image with a certain experiment label. 380 381 If such match is found, we just skip doing reimage and jump to execute 382 some benchmark_runs. 383 """ 384 385 checksum_file = "/usr/local/osimage_checksum_file" 386 try: 387 rv, checksum, _ = command_executer.GetCommandExecuter().\ 388 CrosRunCommand( 389 "cat " + checksum_file, 390 return_output=True, 391 chromeos_root=self._sched._labels[0].chromeos_root, 392 machine=self._dut.name) 393 if rv == 0: 394 checksum = checksum.strip() 395 for l in self._sched._labels: 396 if l.checksum == checksum: 397 self._l.LogOutput( 398 "Dut '{}' is pre-installed with '{}'".format( 399 self._dut.name, l)) 400 self._dut.label = l 401 return 402 except: 403 traceback.print_exc(file=sys.stdout) 404 self._dut.label = None 405 406 def __str__(self): 407 return 'DutWorker[dut="{}", label="{}"]'.format( 408 self._dut.name, self._dut.label.name if self._dut.label else "None") 409 410 def dut(self): 411 return self._dut 412 413 def status_str(self): 414 """Report thread status.""" 415 416 return ('Worker thread "{}", label="{}", benchmark_run={}, ' 417 'reimage={}, now {}'.format( 418 self._dut.name, 419 'None' if self._dut.label is None else self._dut.label.name, 420 self._stat_num_br_run, 421 self._stat_num_reimage, 422 self._stat_annotation)) 423 424 425class Schedv2(object): 426 """New scheduler for crosperf.""" 427 428 def __init__(self, experiment): 429 self._experiment = experiment 430 self._l = logger.GetLogger(experiment.log_dir) 431 432 # Create shortcuts to nested data structure. "_duts" points to a list of 433 # locked machines. _labels points to a list of all labels. 434 self._duts = self._experiment.machine_manager._all_machines 435 self._labels = self._experiment.labels 436 437 # Mapping from label to a list of benchmark_runs. 438 self._label_brl_map = dict([(l, []) for l in self._labels]) 439 for br in self._experiment.benchmark_runs: 440 assert br.label in self._label_brl_map 441 self._label_brl_map[br.label].append(br) 442 443 # Use machine image manager to calculate initial label allocation. 444 self._mim = MachineImageManager(self._labels, self._duts) 445 self._mim.compute_initial_allocation() 446 447 # Create worker thread, 1 per dut. 448 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 449 self._finished_workers = [] 450 451 # Bookkeeping for synchronization. 452 self._workers_lock = Lock() 453 self._lock_map = defaultdict(lambda: Lock()) 454 455 # Termination flag. 456 self._terminated = False 457 458 def run_sched(self): 459 """Start all dut worker threads and return immediately.""" 460 [w.start() for w in self._active_workers] 461 462 def get_benchmark_run(self, dut): 463 """Get a benchmark_run (br) object for a certain dut. 464 465 Arguments: 466 dut: the dut for which a br is returned. 467 468 Returns: 469 A br with its label matching that of the dut. If no such br could be 470 found, return None (this usually means a reimage is required for the 471 dut). 472 """ 473 474 # If terminated, stop providing any br. 475 if self._terminated: 476 return None 477 478 # If dut bears an unrecognized label, return None. 479 if dut.label is None: 480 return None 481 482 # If br list for the dut's label is empty (that means all brs for this 483 # label have been done) , return None. 484 with self._lock_on(dut.label): 485 brl = self._label_brl_map[dut.label] 486 if not brl: 487 return None 488 # Return the first br. 489 return brl.pop(0) 490 491 def allocate_label(self, dut): 492 """Allocate a label to a dut. 493 494 The work is delegated to MachineImageManager. 495 496 The dut_worker calling this method is responsible for reimage the dut to 497 this label. 498 499 Arguments: 500 dut: the new label that is to be reimaged onto the dut. 501 502 Returns: 503 The label or None. 504 """ 505 506 if self._terminated: 507 return None 508 509 return self._mim.allocate(dut, self) 510 511 def dut_worker_finished(self, dut_worker): 512 """Notify schedv2 that the dut_worker thread finished. 513 514 Arguemnts: 515 dut_worker: the thread that is about to end.""" 516 517 self._l.LogOutput("{} finished.".format(dut_worker)) 518 with self._workers_lock: 519 self._active_workers.remove(dut_worker) 520 self._finished_workers.append(dut_worker) 521 522 def is_complete(self): 523 return len(self._active_workers) == 0 524 525 def _lock_on(self, object): 526 return self._lock_map[object] 527 528 def terminate(self): 529 """Mark flag so we stop providing br/reimages. 530 531 Also terminate each DutWorker, so they refuse to execute br or reimage. 532 """ 533 534 self._terminated = True 535 for dut_worker in self._active_workers: 536 dut_worker.terminate() 537 538 def threads_status_as_string(self): 539 """Report the dut worker threads status.""" 540 541 status = "{} active threads, {} finished threads.\n".format( 542 len(self._active_workers), len(self._finished_workers)) 543 status += " Active threads:" 544 for dw in self._active_workers: 545 status += '\n ' + dw.status_str() 546 if self._finished_workers: 547 status += "\n Finished threads:" 548 for dw in self._finished_workers: 549 status += '\n ' + dw.status_str() 550 return status 551 552 553class MockExperimentRunner(ExperimentRunner): 554 """Mocked ExperimentRunner for testing.""" 555 556 def __init__(self, experiment): 557 super(MockExperimentRunner, self).__init__(experiment) 558 559 def _Run(self, experiment): 560 self.l.LogOutput("Would run the following experiment: '%s'." % 561 experiment.name) 562 563 def _PrintTable(self, experiment): 564 self.l.LogOutput("Would print the experiment table.") 565 566 def _Email(self, experiment): 567 self.l.LogOutput("Would send result email.") 568 569 def _StoreResults(self, experiment): 570 self.l.LogOutput("Would store the results.") 571