experiment_runner.py revision 6e8726de19436e28c4d22c3f3dc3bbae87dd4c12
1# Copyright 2011-2015 Google Inc. All Rights Reserved. 2 3"""The experiment runner module.""" 4import getpass 5import os 6import shutil 7import time 8 9import afe_lock_machine 10 11from cros_utils import command_executer 12from cros_utils import logger 13from cros_utils.email_sender import EmailSender 14from cros_utils.file_utils import FileUtils 15 16import config 17from experiment_status import ExperimentStatus 18from results_cache import CacheConditions 19from results_cache import ResultsCache 20from results_report import HTMLResultsReport 21from results_report import TextResultsReport 22from results_report import JSONResultsReport 23from schedv2 import Schedv2 24 25 26class ExperimentRunner(object): 27 """ExperimentRunner Class.""" 28 29 STATUS_TIME_DELAY = 30 30 THREAD_MONITOR_DELAY = 2 31 32 def __init__(self, experiment, json_report, using_schedv2=False, log=None, 33 cmd_exec=None): 34 self._experiment = experiment 35 self.l = log or logger.GetLogger(experiment.log_dir) 36 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 37 self._terminated = False 38 self.json_report = json_report 39 self.locked_machines = [] 40 if experiment.log_level != "verbose": 41 self.STATUS_TIME_DELAY = 10 42 43 # Setting this to True will use crosperf sched v2 (feature in progress). 44 self._using_schedv2 = using_schedv2 45 46 def _GetMachineList(self): 47 """Return a list of all requested machines. 48 49 Create a list of all the requested machines, both global requests and 50 label-specific requests, and return the list. 51 """ 52 machines = self._experiment.remote 53 # All Label.remote is a sublist of experiment.remote. 54 for l in self._experiment.labels: 55 for r in l.remote: 56 assert r in machines 57 return machines 58 59 def _UpdateMachineList(self, locked_machines): 60 """Update machines lists to contain only locked machines. 61 62 Go through all the lists of requested machines, both global and 63 label-specific requests, and remove any machine that we were not 64 able to lock. 65 66 Args: 67 locked_machines: A list of the machines we successfully locked. 68 """ 69 for m in self._experiment.remote: 70 if m not in locked_machines: 71 self._experiment.remote.remove(m) 72 73 for l in self._experiment.labels: 74 for m in l.remote: 75 if m not in locked_machines: 76 l.remote.remove(m) 77 78 def _LockAllMachines(self, experiment): 79 """Attempt to globally lock all of the machines requested for run. 80 81 This method will use the AFE server to globally lock all of the machines 82 requested for this crosperf run, to prevent any other crosperf runs from 83 being able to update/use the machines while this experiment is running. 84 """ 85 lock_mgr = afe_lock_machine.AFELockManager( 86 self._GetMachineList(), 87 "", 88 experiment.labels[0].chromeos_root, 89 None, 90 log=self.l, 91 ) 92 for m in lock_mgr.machines: 93 if not lock_mgr.MachineIsKnown(m): 94 lock_mgr.AddLocalMachine(m) 95 machine_states = lock_mgr.GetMachineStates("lock") 96 lock_mgr.CheckMachineLocks(machine_states, "lock") 97 self.locked_machines = lock_mgr.UpdateMachines(True) 98 self._experiment.locked_machines = self.locked_machines 99 self._UpdateMachineList(self.locked_machines) 100 self._experiment.machine_manager.RemoveNonLockedMachines( 101 self.locked_machines) 102 if len(self.locked_machines) == 0: 103 raise RuntimeError("Unable to lock any machines.") 104 105 def _UnlockAllMachines(self, experiment): 106 """Attempt to globally unlock all of the machines requested for run. 107 108 The method will use the AFE server to globally unlock all of the machines 109 requested for this crosperf run. 110 """ 111 if not self.locked_machines: 112 return 113 114 lock_mgr = afe_lock_machine.AFELockManager( 115 self.locked_machines, 116 "", 117 experiment.labels[0].chromeos_root, 118 None, 119 log=self.l, 120 ) 121 machine_states = lock_mgr.GetMachineStates("unlock") 122 lock_mgr.CheckMachineLocks(machine_states, "unlock") 123 lock_mgr.UpdateMachines(False) 124 125 def _ClearCacheEntries(self, experiment): 126 for br in experiment.benchmark_runs: 127 cache = ResultsCache() 128 cache.Init(br.label.chromeos_image, br.label.chromeos_root, 129 br.benchmark.test_name, br.iteration, br.test_args, 130 br.profiler_args, br.machine_manager, br.machine, 131 br.label.board, br.cache_conditions, br._logger, br.log_level, 132 br.label, br.share_cache, br.benchmark.suite, 133 br.benchmark.show_all_results, br.benchmark.run_local) 134 cache_dir = cache._GetCacheDirForWrite() 135 if os.path.exists(cache_dir): 136 self.l.LogOutput("Removing cache dir: %s" % cache_dir) 137 shutil.rmtree(cache_dir) 138 139 def _Run(self, experiment): 140 try: 141 if not experiment.locks_dir: 142 self._LockAllMachines(experiment) 143 if self._using_schedv2: 144 schedv2 = Schedv2(experiment) 145 experiment.set_schedv2(schedv2) 146 if CacheConditions.FALSE in experiment.cache_conditions: 147 self._ClearCacheEntries(experiment) 148 status = ExperimentStatus(experiment) 149 experiment.Run() 150 last_status_time = 0 151 last_status_string = "" 152 try: 153 if experiment.log_level != "verbose": 154 self.l.LogStartDots() 155 while not experiment.IsComplete(): 156 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 157 last_status_time = time.time() 158 border = "==============================" 159 if experiment.log_level == "verbose": 160 self.l.LogOutput(border) 161 self.l.LogOutput(status.GetProgressString()) 162 self.l.LogOutput(status.GetStatusString()) 163 self.l.LogOutput(border) 164 else: 165 current_status_string = status.GetStatusString() 166 if current_status_string != last_status_string: 167 self.l.LogEndDots() 168 self.l.LogOutput(border) 169 self.l.LogOutput(current_status_string) 170 self.l.LogOutput(border) 171 last_status_string = current_status_string 172 else: 173 self.l.LogAppendDot() 174 time.sleep(self.THREAD_MONITOR_DELAY) 175 except KeyboardInterrupt: 176 self._terminated = True 177 self.l.LogError("Ctrl-c pressed. Cleaning up...") 178 experiment.Terminate() 179 raise 180 except SystemExit: 181 self._terminated = True 182 self.l.LogError("Unexpected exit. Cleaning up...") 183 experiment.Terminate() 184 raise 185 finally: 186 if not experiment.locks_dir: 187 self._UnlockAllMachines(experiment) 188 189 def _PrintTable(self, experiment): 190 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 191 192 def _Email(self, experiment): 193 # Only email by default if a new run was completed. 194 send_mail = False 195 for benchmark_run in experiment.benchmark_runs: 196 if not benchmark_run.cache_hit: 197 send_mail = True 198 break 199 if (not send_mail and not experiment.email_to 200 or config.GetConfig("no_email")): 201 return 202 203 label_names = [] 204 for label in experiment.labels: 205 label_names.append(label.name) 206 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 207 208 text_report = TextResultsReport(experiment, True).GetReport() 209 text_report += ("\nResults are stored in %s.\n" % 210 experiment.results_directory) 211 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 212 html_report = HTMLResultsReport(experiment).GetReport() 213 attachment = EmailSender.Attachment("report.html", html_report) 214 email_to = [getpass.getuser()] + experiment.email_to 215 EmailSender().SendEmail(email_to, 216 subject, 217 text_report, 218 attachments=[attachment], 219 msg_type="html") 220 221 def _StoreResults(self, experiment): 222 if self._terminated: 223 return 224 results_directory = experiment.results_directory 225 FileUtils().RmDir(results_directory) 226 FileUtils().MkDirP(results_directory) 227 self.l.LogOutput("Storing experiment file in %s." % results_directory) 228 experiment_file_path = os.path.join(results_directory, 229 "experiment.exp") 230 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 231 232 self.l.LogOutput("Storing results report in %s." % results_directory) 233 results_table_path = os.path.join(results_directory, "results.html") 234 report = HTMLResultsReport(experiment).GetReport() 235 if self.json_report: 236 JSONResultsReport(experiment).GetReport(results_directory) 237 FileUtils().WriteFile(results_table_path, report) 238 239 self.l.LogOutput("Storing email message body in %s." % results_directory) 240 msg_file_path = os.path.join(results_directory, "msg_body.html") 241 text_report = TextResultsReport(experiment, True).GetReport() 242 text_report += ("\nResults are stored in %s.\n" % 243 experiment.results_directory) 244 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 245 FileUtils().WriteFile(msg_file_path, msg_body) 246 247 self.l.LogOutput("Storing results of each benchmark run.") 248 for benchmark_run in experiment.benchmark_runs: 249 if benchmark_run.result: 250 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 251 benchmark_run_path = os.path.join(results_directory, 252 benchmark_run_name) 253 benchmark_run.result.CopyResultsTo(benchmark_run_path) 254 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 255 256 def Run(self): 257 try: 258 self._Run(self._experiment) 259 finally: 260 # Always print the report at the end of the run. 261 self._PrintTable(self._experiment) 262 if not self._terminated: 263 self._StoreResults(self._experiment) 264 self._Email(self._experiment) 265 266 267class MockExperimentRunner(ExperimentRunner): 268 """Mocked ExperimentRunner for testing.""" 269 270 def __init__(self, experiment, json_report): 271 super(MockExperimentRunner, self).__init__(experiment, json_report) 272 273 def _Run(self, experiment): 274 self.l.LogOutput("Would run the following experiment: '%s'." % 275 experiment.name) 276 277 def _PrintTable(self, experiment): 278 self.l.LogOutput("Would print the experiment table.") 279 280 def _Email(self, experiment): 281 self.l.LogOutput("Would send result email.") 282 283 def _StoreResults(self, experiment): 284 self.l.LogOutput("Would store the results.") 285