experiment_runner.py revision 6736a8e1044141d657aeac5916b32123da664e97
1# Copyright 2011-2015 Google Inc. All Rights Reserved. 2"""The experiment runner module.""" 3import getpass 4import os 5import shutil 6import time 7 8import afe_lock_machine 9import test_flag 10 11from cros_utils import command_executer 12from cros_utils import logger 13from cros_utils.email_sender import EmailSender 14from cros_utils.file_utils import FileUtils 15 16import config 17from experiment_status import ExperimentStatus 18from results_cache import CacheConditions 19from results_cache import ResultsCache 20from results_report import HTMLResultsReport 21from results_report import TextResultsReport 22from results_report import JSONResultsReport 23from schedv2 import Schedv2 24 25 26class ExperimentRunner(object): 27 """ExperimentRunner Class.""" 28 29 STATUS_TIME_DELAY = 30 30 THREAD_MONITOR_DELAY = 2 31 32 def __init__(self, 33 experiment, 34 json_report, 35 using_schedv2=False, 36 log=None, 37 cmd_exec=None): 38 self._experiment = experiment 39 self.l = log or logger.GetLogger(experiment.log_dir) 40 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 41 self._terminated = False 42 self.json_report = json_report 43 self.locked_machines = [] 44 if experiment.log_level != 'verbose': 45 self.STATUS_TIME_DELAY = 10 46 47 # Setting this to True will use crosperf sched v2 (feature in progress). 48 self._using_schedv2 = using_schedv2 49 50 def _GetMachineList(self): 51 """Return a list of all requested machines. 52 53 Create a list of all the requested machines, both global requests and 54 label-specific requests, and return the list. 55 """ 56 machines = self._experiment.remote 57 # All Label.remote is a sublist of experiment.remote. 58 for l in self._experiment.labels: 59 for r in l.remote: 60 assert r in machines 61 return machines 62 63 def _UpdateMachineList(self, locked_machines): 64 """Update machines lists to contain only locked machines. 65 66 Go through all the lists of requested machines, both global and 67 label-specific requests, and remove any machine that we were not 68 able to lock. 69 70 Args: 71 locked_machines: A list of the machines we successfully locked. 72 """ 73 for m in self._experiment.remote: 74 if m not in locked_machines: 75 self._experiment.remote.remove(m) 76 77 for l in self._experiment.labels: 78 for m in l.remote: 79 if m not in locked_machines: 80 l.remote.remove(m) 81 82 def _LockAllMachines(self, experiment): 83 """Attempt to globally lock all of the machines requested for run. 84 85 This method will use the AFE server to globally lock all of the machines 86 requested for this crosperf run, to prevent any other crosperf runs from 87 being able to update/use the machines while this experiment is running. 88 """ 89 if test_flag.GetTestMode(): 90 self.locked_machines = self._GetMachineList() 91 self._experiment.locked_machines = self.locked_machines 92 else: 93 lock_mgr = afe_lock_machine.AFELockManager( 94 self._GetMachineList(), 95 '', 96 experiment.labels[0].chromeos_root, 97 None, 98 log=self.l,) 99 for m in lock_mgr.machines: 100 if not lock_mgr.MachineIsKnown(m): 101 lock_mgr.AddLocalMachine(m) 102 machine_states = lock_mgr.GetMachineStates('lock') 103 lock_mgr.CheckMachineLocks(machine_states, 'lock') 104 self.locked_machines = lock_mgr.UpdateMachines(True) 105 self._experiment.locked_machines = self.locked_machines 106 self._UpdateMachineList(self.locked_machines) 107 self._experiment.machine_manager.RemoveNonLockedMachines( 108 self.locked_machines) 109 if len(self.locked_machines) == 0: 110 raise RuntimeError('Unable to lock any machines.') 111 112 def _UnlockAllMachines(self, experiment): 113 """Attempt to globally unlock all of the machines requested for run. 114 115 The method will use the AFE server to globally unlock all of the machines 116 requested for this crosperf run. 117 """ 118 if not self.locked_machines or test_flag.GetTestMode(): 119 return 120 121 lock_mgr = afe_lock_machine.AFELockManager( 122 self.locked_machines, 123 '', 124 experiment.labels[0].chromeos_root, 125 None, 126 log=self.l,) 127 machine_states = lock_mgr.GetMachineStates('unlock') 128 lock_mgr.CheckMachineLocks(machine_states, 'unlock') 129 lock_mgr.UpdateMachines(False) 130 131 def _ClearCacheEntries(self, experiment): 132 for br in experiment.benchmark_runs: 133 cache = ResultsCache() 134 cache.Init(br.label.chromeos_image, br.label.chromeos_root, 135 br.benchmark.test_name, br.iteration, br.test_args, 136 br.profiler_args, br.machine_manager, br.machine, 137 br.label.board, br.cache_conditions, br._logger, br.log_level, 138 br.label, br.share_cache, br.benchmark.suite, 139 br.benchmark.show_all_results, br.benchmark.run_local) 140 cache_dir = cache.GetCacheDirForWrite() 141 if os.path.exists(cache_dir): 142 self.l.LogOutput('Removing cache dir: %s' % cache_dir) 143 shutil.rmtree(cache_dir) 144 145 def _Run(self, experiment): 146 try: 147 if not experiment.locks_dir: 148 self._LockAllMachines(experiment) 149 if self._using_schedv2: 150 schedv2 = Schedv2(experiment) 151 experiment.set_schedv2(schedv2) 152 if CacheConditions.FALSE in experiment.cache_conditions: 153 self._ClearCacheEntries(experiment) 154 status = ExperimentStatus(experiment) 155 experiment.Run() 156 last_status_time = 0 157 last_status_string = '' 158 try: 159 if experiment.log_level != 'verbose': 160 self.l.LogStartDots() 161 while not experiment.IsComplete(): 162 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 163 last_status_time = time.time() 164 border = '==============================' 165 if experiment.log_level == 'verbose': 166 self.l.LogOutput(border) 167 self.l.LogOutput(status.GetProgressString()) 168 self.l.LogOutput(status.GetStatusString()) 169 self.l.LogOutput(border) 170 else: 171 current_status_string = status.GetStatusString() 172 if current_status_string != last_status_string: 173 self.l.LogEndDots() 174 self.l.LogOutput(border) 175 self.l.LogOutput(current_status_string) 176 self.l.LogOutput(border) 177 last_status_string = current_status_string 178 else: 179 self.l.LogAppendDot() 180 time.sleep(self.THREAD_MONITOR_DELAY) 181 except KeyboardInterrupt: 182 self._terminated = True 183 self.l.LogError('Ctrl-c pressed. Cleaning up...') 184 experiment.Terminate() 185 raise 186 except SystemExit: 187 self._terminated = True 188 self.l.LogError('Unexpected exit. Cleaning up...') 189 experiment.Terminate() 190 raise 191 finally: 192 if not experiment.locks_dir: 193 self._UnlockAllMachines(experiment) 194 195 def _PrintTable(self, experiment): 196 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 197 198 def _Email(self, experiment): 199 # Only email by default if a new run was completed. 200 send_mail = False 201 for benchmark_run in experiment.benchmark_runs: 202 if not benchmark_run.cache_hit: 203 send_mail = True 204 break 205 if (not send_mail and not experiment.email_to or 206 config.GetConfig('no_email')): 207 return 208 209 label_names = [] 210 for label in experiment.labels: 211 label_names.append(label.name) 212 subject = '%s: %s' % (experiment.name, ' vs. '.join(label_names)) 213 214 text_report = TextResultsReport(experiment, True).GetReport() 215 text_report += ('\nResults are stored in %s.\n' % 216 experiment.results_directory) 217 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 218 html_report = HTMLResultsReport(experiment).GetReport() 219 attachment = EmailSender.Attachment('report.html', html_report) 220 email_to = experiment.email_to or [] 221 email_to.append(getpass.getuser()) 222 EmailSender().SendEmail(email_to, 223 subject, 224 text_report, 225 attachments=[attachment], 226 msg_type='html') 227 228 def _StoreResults(self, experiment): 229 if self._terminated: 230 return 231 results_directory = experiment.results_directory 232 FileUtils().RmDir(results_directory) 233 FileUtils().MkDirP(results_directory) 234 self.l.LogOutput('Storing experiment file in %s.' % results_directory) 235 experiment_file_path = os.path.join(results_directory, 'experiment.exp') 236 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 237 238 self.l.LogOutput('Storing results report in %s.' % results_directory) 239 results_table_path = os.path.join(results_directory, 'results.html') 240 report = HTMLResultsReport(experiment).GetReport() 241 if self.json_report: 242 JSONResultsReport(experiment).GetReport(results_directory) 243 FileUtils().WriteFile(results_table_path, report) 244 245 self.l.LogOutput('Storing email message body in %s.' % results_directory) 246 msg_file_path = os.path.join(results_directory, 'msg_body.html') 247 text_report = TextResultsReport(experiment, True).GetReport() 248 text_report += ('\nResults are stored in %s.\n' % 249 experiment.results_directory) 250 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 251 FileUtils().WriteFile(msg_file_path, msg_body) 252 253 self.l.LogOutput('Storing results of each benchmark run.') 254 for benchmark_run in experiment.benchmark_runs: 255 if benchmark_run.result: 256 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 257 benchmark_run_path = os.path.join(results_directory, benchmark_run_name) 258 benchmark_run.result.CopyResultsTo(benchmark_run_path) 259 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 260 261 def Run(self): 262 try: 263 self._Run(self._experiment) 264 finally: 265 # Always print the report at the end of the run. 266 self._PrintTable(self._experiment) 267 if not self._terminated: 268 self._StoreResults(self._experiment) 269 self._Email(self._experiment) 270 271 272class MockExperimentRunner(ExperimentRunner): 273 """Mocked ExperimentRunner for testing.""" 274 275 def __init__(self, experiment, json_report): 276 super(MockExperimentRunner, self).__init__(experiment, json_report) 277 278 def _Run(self, experiment): 279 self.l.LogOutput("Would run the following experiment: '%s'." % 280 experiment.name) 281 282 def _PrintTable(self, experiment): 283 self.l.LogOutput('Would print the experiment table.') 284 285 def _Email(self, experiment): 286 self.l.LogOutput('Would send result email.') 287 288 def _StoreResults(self, experiment): 289 self.l.LogOutput('Would store the results.') 290