experiment_runner.py revision 245e0635a5ae426c5345a06baf807940d1784125
1#!/usr/bin/python 2 3# Copyright 2011 Google Inc. All Rights Reserved. 4 5"""The experiment runner module.""" 6import getpass 7import os 8import random 9import sys 10import time 11import traceback 12 13import afe_lock_machine 14from machine_image_manager import MachineImageManager 15 16from collections import defaultdict 17from utils import command_executer 18from utils import logger 19from utils.email_sender import EmailSender 20from utils.file_utils import FileUtils 21from threading import Lock 22from threading import Thread 23 24import config 25from experiment_status import ExperimentStatus 26from results_report import HTMLResultsReport 27from results_report import TextResultsReport 28from results_report import JSONResultsReport 29 30 31class ExperimentRunner(object): 32 """ExperimentRunner Class.""" 33 34 STATUS_TIME_DELAY = 30 35 THREAD_MONITOR_DELAY = 2 36 37 def __init__(self, experiment, json_report, using_schedv2=False, log=None, 38 cmd_exec=None): 39 self._experiment = experiment 40 self.l = log or logger.GetLogger(experiment.log_dir) 41 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 42 self._terminated = False 43 self.json_report = json_report 44 self.locked_machines = [] 45 if experiment.log_level != "verbose": 46 self.STATUS_TIME_DELAY = 10 47 48 # Setting this to True will use crosperf sched v2 (feature in progress). 49 self._using_schedv2 = using_schedv2 50 51 def _GetMachineList(self): 52 """Return a list of all requested machines. 53 54 Create a list of all the requested machines, both global requests and 55 label-specific requests, and return the list. 56 """ 57 machines = self._experiment.remote 58 for l in self._experiment.labels: 59 if l.remote: 60 machines += l.remote 61 return machines 62 63 def _UpdateMachineList(self, locked_machines): 64 """Update machines lists to contain only locked machines. 65 66 Go through all the lists of requested machines, both global and 67 label-specific requests, and remove any machine that we were not 68 able to lock. 69 70 Args: 71 locked_machines: A list of the machines we successfully locked. 72 """ 73 for m in self._experiment.remote: 74 if m not in locked_machines: 75 self._experiment.remote.remove(m) 76 77 for l in self._experiment.labels: 78 for m in l.remote: 79 if m not in locked_machines: 80 l.remote.remove(m) 81 82 def _LockAllMachines(self, experiment): 83 """Attempt to globally lock all of the machines requested for run. 84 85 This method will use the AFE server to globally lock all of the machines 86 requested for this crosperf run, to prevent any other crosperf runs from 87 being able to update/use the machines while this experiment is running. 88 """ 89 lock_mgr = afe_lock_machine.AFELockManager( 90 self._GetMachineList(), 91 "", 92 experiment.labels[0].chromeos_root, 93 None, 94 log=self.l, 95 ) 96 for m in lock_mgr.machines: 97 if not lock_mgr.MachineIsKnown(m): 98 lock_mgr.AddLocalMachine(m) 99 machine_states = lock_mgr.GetMachineStates("lock") 100 lock_mgr.CheckMachineLocks(machine_states, "lock") 101 self.locked_machines = lock_mgr.UpdateMachines(True) 102 self._experiment.locked_machines = self.locked_machines 103 self._UpdateMachineList(self.locked_machines) 104 self._experiment.machine_manager.RemoveNonLockedMachines( 105 self.locked_machines) 106 if len(self.locked_machines) == 0: 107 raise RuntimeError("Unable to lock any machines.") 108 109 def _UnlockAllMachines(self, experiment): 110 """Attempt to globally unlock all of the machines requested for run. 111 112 The method will use the AFE server to globally unlock all of the machines 113 requested for this crosperf run. 114 """ 115 if not self.locked_machines: 116 return 117 118 lock_mgr = afe_lock_machine.AFELockManager( 119 self.locked_machines, 120 "", 121 experiment.labels[0].chromeos_root, 122 None, 123 log=self.l, 124 ) 125 machine_states = lock_mgr.GetMachineStates("unlock") 126 lock_mgr.CheckMachineLocks(machine_states, "unlock") 127 lock_mgr.UpdateMachines(False) 128 129 def _Run(self, experiment): 130 try: 131 if not experiment.locks_dir: 132 self._LockAllMachines(experiment) 133 if self._using_schedv2: 134 schedv2 = Schedv2(experiment) 135 experiment.set_schedv2(schedv2) 136 status = ExperimentStatus(experiment) 137 experiment.Run() 138 last_status_time = 0 139 last_status_string = "" 140 try: 141 if experiment.log_level != "verbose": 142 self.l.LogStartDots() 143 while not experiment.IsComplete(): 144 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 145 last_status_time = time.time() 146 border = "==============================" 147 if experiment.log_level == "verbose": 148 self.l.LogOutput(border) 149 self.l.LogOutput(status.GetProgressString()) 150 self.l.LogOutput(status.GetStatusString()) 151 self.l.LogOutput(border) 152 else: 153 current_status_string = status.GetStatusString() 154 if (current_status_string != last_status_string): 155 self.l.LogEndDots() 156 self.l.LogOutput(border) 157 self.l.LogOutput(current_status_string) 158 self.l.LogOutput(border) 159 last_status_string = current_status_string 160 else: 161 self.l.LogAppendDot() 162 time.sleep(self.THREAD_MONITOR_DELAY) 163 except KeyboardInterrupt: 164 self._terminated = True 165 self.l.LogError("Ctrl-c pressed. Cleaning up...") 166 experiment.Terminate() 167 finally: 168 if not experiment.locks_dir: 169 self._UnlockAllMachines(experiment) 170 171 def _PrintTable(self, experiment): 172 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 173 174 def _Email(self, experiment): 175 # Only email by default if a new run was completed. 176 send_mail = False 177 for benchmark_run in experiment.benchmark_runs: 178 if not benchmark_run.cache_hit: 179 send_mail = True 180 break 181 if (not send_mail and not experiment.email_to 182 or config.GetConfig("no_email")): 183 return 184 185 label_names = [] 186 for label in experiment.labels: 187 label_names.append(label.name) 188 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 189 190 text_report = TextResultsReport(experiment, True).GetReport() 191 text_report += ("\nResults are stored in %s.\n" % 192 experiment.results_directory) 193 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 194 html_report = HTMLResultsReport(experiment).GetReport() 195 attachment = EmailSender.Attachment("report.html", html_report) 196 email_to = [getpass.getuser()] + experiment.email_to 197 EmailSender().SendEmail(email_to, 198 subject, 199 text_report, 200 attachments=[attachment], 201 msg_type="html") 202 203 def _StoreResults (self, experiment): 204 if self._terminated: 205 return 206 results_directory = experiment.results_directory 207 FileUtils().RmDir(results_directory) 208 FileUtils().MkDirP(results_directory) 209 self.l.LogOutput("Storing experiment file in %s." % results_directory) 210 experiment_file_path = os.path.join(results_directory, 211 "experiment.exp") 212 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 213 214 self.l.LogOutput("Storing results report in %s." % results_directory) 215 results_table_path = os.path.join(results_directory, "results.html") 216 report = HTMLResultsReport(experiment).GetReport() 217 if self.json_report: 218 JSONResultsReport(experiment).GetReport(results_directory) 219 FileUtils().WriteFile(results_table_path, report) 220 221 self.l.LogOutput("Storing email message body in %s." % results_directory) 222 msg_file_path = os.path.join(results_directory, "msg_body.html") 223 text_report = TextResultsReport(experiment, True).GetReport() 224 text_report += ("\nResults are stored in %s.\n" % 225 experiment.results_directory) 226 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 227 FileUtils().WriteFile(msg_file_path, msg_body) 228 229 self.l.LogOutput("Storing results of each benchmark run.") 230 for benchmark_run in experiment.benchmark_runs: 231 if benchmark_run.result: 232 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 233 benchmark_run_path = os.path.join(results_directory, 234 benchmark_run_name) 235 benchmark_run.result.CopyResultsTo(benchmark_run_path) 236 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 237 238 def Run(self): 239 self._Run(self._experiment) 240 self._PrintTable(self._experiment) 241 if not self._terminated: 242 self._StoreResults(self._experiment) 243 self._Email(self._experiment) 244 245class DutWorker(Thread): 246 247 def __init__(self, dut, sched): 248 super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) 249 self._dut = dut 250 self._sched = sched 251 self._stat_num_br_run = 0 252 self._stat_num_reimage = 0 253 self._stat_annotation = "" 254 self._l = logger.GetLogger(self._sched._experiment.log_dir) 255 self.daemon = True 256 self._terminated = False 257 self._active_br = None 258 # Race condition accessing _active_br between _execute_benchmark_run and 259 # _terminate, so lock it up. 260 self._active_br_lock = Lock() 261 262 def terminate(self): 263 self._terminated = True 264 with self._active_br_lock: 265 if self._active_br is not None: 266 # BenchmarkRun.Terminate() terminates any running testcase via 267 # suite_runner.Terminate and updates timeline. 268 self._active_br.Terminate() 269 270 def run(self): 271 """Do the "run-test->(optionally reimage)->run-test" chore. 272 273 Note - 'br' below means 'benchmark_run'. 274 """ 275 276 self._setup_dut_label() 277 try: 278 self._l.LogOutput("{} started.".format(self)) 279 while not self._terminated: 280 br = self._sched.get_benchmark_run(self._dut) 281 if br is None: 282 # No br left for this label. Considering reimaging. 283 label = self._sched.allocate_label(self._dut) 284 if label is None: 285 # No br even for other labels. We are done. 286 self._l.LogOutput("ImageManager found no label " 287 "for dut, stopping working " 288 "thread {}.".format(self)) 289 break 290 if self._reimage(label): 291 # Reimage to run other br fails, dut is doomed, stop 292 # this thread. 293 self._l.LogWarning("Re-image failed, dut " 294 "in an unstable state, stopping " 295 "working thread {}.".format(self)) 296 break 297 else: 298 # Execute the br. 299 self._execute_benchmark_run(br) 300 finally: 301 self._stat_annotation = "finished" 302 # Thread finishes. Notify scheduler that I'm done. 303 self._sched.dut_worker_finished(self) 304 305 def _reimage(self, label): 306 """Reimage image to label. 307 308 Args: 309 label: the label to remimage onto dut. 310 311 Returns: 312 0 if successful, otherwise 1. 313 """ 314 315 # Termination could happen anywhere, check it. 316 if self._terminated: 317 return 1 318 319 self._l.LogOutput('Reimaging {} using {}'.format(self, label)) 320 self._stat_num_reimage += 1 321 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 322 try: 323 # Note, only 1 reimage at any given time, this is guaranteed in 324 # ImageMachine, so no sync needed below. 325 retval = self._sched._experiment.machine_manager.ImageMachine( 326 self._dut, label) 327 if retval: 328 return 1 329 except: 330 return 1 331 332 self._dut.label = label 333 return 0 334 335 def _execute_benchmark_run(self, br): 336 """Execute a single benchmark_run. 337 338 Note - this function never throws exceptions. 339 """ 340 341 # Termination could happen anywhere, check it. 342 if self._terminated: 343 return 344 345 self._l.LogOutput('{} started working on {}'.format(self, br)) 346 self._stat_num_br_run += 1 347 self._stat_annotation = 'executing {}'.format(br) 348 # benchmark_run.run does not throws, but just play it safe here. 349 try: 350 assert br.owner_thread is None 351 br.owner_thread = self 352 with self._active_br_lock: 353 self._active_br = br 354 br.run() 355 finally: 356 self._sched._experiment.BenchmarkRunFinished(br) 357 with self._active_br_lock: 358 self._active_br = None 359 360 def _setup_dut_label(self): 361 """Try to match dut image with a certain experiment label. 362 363 If such match is found, we just skip doing reimage and jump to execute 364 some benchmark_runs. 365 """ 366 367 checksum_file = "/usr/local/osimage_checksum_file" 368 try: 369 rv, checksum, _ = command_executer.GetCommandExecuter().\ 370 CrosRunCommand( 371 "cat " + checksum_file, 372 return_output=True, 373 chromeos_root=self._sched._labels[0].chromeos_root, 374 machine=self._dut.name) 375 if rv == 0: 376 checksum = checksum.strip() 377 for l in self._sched._labels: 378 if l.checksum == checksum: 379 self._l.LogOutput( 380 "Dut '{}' is pre-installed with '{}'".format( 381 self._dut.name, l)) 382 self._dut.label = l 383 return 384 except: 385 traceback.print_exc(file=sys.stdout) 386 self._dut.label = None 387 388 def __str__(self): 389 return 'DutWorker[dut="{}", label="{}"]'.format( 390 self._dut.name, self._dut.label.name if self._dut.label else "None") 391 392 def dut(self): 393 return self._dut 394 395 def status_str(self): 396 """Report thread status.""" 397 398 return ('Worker thread "{}", label="{}", benchmark_run={}, ' 399 'reimage={}, now {}'.format( 400 self._dut.name, 401 'None' if self._dut.label is None else self._dut.label.name, 402 self._stat_num_br_run, 403 self._stat_num_reimage, 404 self._stat_annotation)) 405 406 407class Schedv2(object): 408 """New scheduler for crosperf.""" 409 410 def __init__(self, experiment): 411 self._experiment = experiment 412 self._l = logger.GetLogger(experiment.log_dir) 413 414 # Create shortcuts to nested data structure. "_duts" points to a list of 415 # locked machines. _labels points to a list of all labels. 416 self._duts = self._experiment.machine_manager._all_machines 417 self._labels = self._experiment.labels 418 419 # Mapping from label to a list of benchmark_runs. 420 self._label_brl_map = dict([(l, []) for l in self._labels]) 421 for br in self._experiment.benchmark_runs: 422 assert br.label in self._label_brl_map 423 self._label_brl_map[br.label].append(br) 424 425 # Use machine image manager to calculate initial label allocation. 426 self._mim = MachineImageManager(self._labels, self._duts) 427 self._mim.compute_initial_allocation() 428 429 # Create worker thread, 1 per dut. 430 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 431 self._finished_workers = [] 432 433 # Bookkeeping for synchronization. 434 self._workers_lock = Lock() 435 self._lock_map = defaultdict(lambda: Lock()) 436 437 # Termination flag. 438 self._terminated = False 439 440 def _check_machines_are_same(self): 441 """Check if all machines are the same.""" 442 443 for l in self._labels: 444 if not self._experiment.machine_manager.AreAllMachineSame(l): 445 self._l.LogError('All machines are NOT same for ' 446 'label "{}" ...'.format(l.name)) 447 return False 448 self._l.LogOutput('Machines are same for ' 449 'label "{}" ...'.format(l.name)) 450 return True 451 452 def run_sched(self): 453 """Start all dut worker threads and return immediately.""" 454 455 if not self._check_machines_are_same(): 456 raise RuntimeError("All machines are not same.") 457 458 [w.start() for w in self._active_workers] 459 460 def get_benchmark_run(self, dut): 461 """Get a benchmark_run (br) object for a certain dut. 462 463 Arguments: 464 dut: the dut for which a br is returned. 465 466 Returns: 467 A br with its label matching that of the dut. If no such br could be 468 found, return None (this usually means a reimage is required for the 469 dut). 470 """ 471 472 # If terminated, stop providing any br. 473 if self._terminated: 474 return None 475 476 # If dut bears an unrecognized label, return None. 477 if dut.label is None: 478 return None 479 480 # If br list for the dut's label is empty (that means all brs for this 481 # label have been done) , return None. 482 with self._lock_on(dut.label): 483 brl = self._label_brl_map[dut.label] 484 if not brl: 485 return None 486 # Return the first br. 487 return brl.pop(0) 488 489 def allocate_label(self, dut): 490 """Allocate a label to a dut. 491 492 The work is delegated to MachineImageManager. 493 494 The dut_worker calling this method is responsible for reimage the dut to 495 this label. 496 497 Arguments: 498 dut: the new label that is to be reimaged onto the dut. 499 500 Returns: 501 The label or None. 502 """ 503 504 if self._terminated: 505 return None 506 507 return self._mim.allocate(dut) 508 509 def dut_worker_finished(self, dut_worker): 510 """Notify schedv2 that the dut_worker thread finished. 511 512 Arguemnts: 513 dut_worker: the thread that is about to end.""" 514 515 self._l.LogOutput("{} finished.".format(dut_worker)) 516 with self._workers_lock: 517 self._active_workers.remove(dut_worker) 518 self._finished_workers.append(dut_worker) 519 520 def is_complete(self): 521 return len(self._active_workers) == 0 522 523 def _lock_on(self, object): 524 return self._lock_map[object] 525 526 def terminate(self): 527 """Mark flag so we stop providing br/reimages. 528 529 Also terminate each DutWorker, so they refuse to execute br or reimage. 530 """ 531 532 self._terminated = True 533 for dut_worker in self._active_workers: 534 dut_worker.terminate() 535 536 def threads_status_as_string(self): 537 """Report the dut worker threads status.""" 538 539 status = "{} active threads, {} finished threads.\n".format( 540 len(self._active_workers), len(self._finished_workers)) 541 status += " Active threads:" 542 for dw in self._active_workers: 543 status += '\n ' + dw.status_str() 544 if self._finished_workers: 545 status += "\n Finished threads:" 546 for dw in self._finished_workers: 547 status += '\n ' + dw.status_str() 548 return status 549 550 551class MockExperimentRunner(ExperimentRunner): 552 """Mocked ExperimentRunner for testing.""" 553 554 def __init__(self, experiment): 555 super(MockExperimentRunner, self).__init__(experiment) 556 557 def _Run(self, experiment): 558 self.l.LogOutput("Would run the following experiment: '%s'." % 559 experiment.name) 560 561 def _PrintTable(self, experiment): 562 self.l.LogOutput("Would print the experiment table.") 563 564 def _Email(self, experiment): 565 self.l.LogOutput("Would send result email.") 566 567 def _StoreResults(self, experiment): 568 self.l.LogOutput("Would store the results.") 569