experiment_runner.py revision 45b53c5fee4c044cebd74dbbbd6119fbd51554e7
1#!/usr/bin/python 2 3# Copyright 2011-2015 Google Inc. All Rights Reserved. 4 5"""The experiment runner module.""" 6import getpass 7import os 8import random 9import shutil 10import sys 11import time 12 13import afe_lock_machine 14 15from utils import command_executer 16from utils import logger 17from utils.email_sender import EmailSender 18from utils.file_utils import FileUtils 19 20import config 21from experiment_status import ExperimentStatus 22from results_cache import CacheConditions 23from results_cache import ResultsCache 24from results_report import HTMLResultsReport 25from results_report import TextResultsReport 26from results_report import JSONResultsReport 27from schedv2 import Schedv2 28 29 30class ExperimentRunner(object): 31 """ExperimentRunner Class.""" 32 33 STATUS_TIME_DELAY = 30 34 THREAD_MONITOR_DELAY = 2 35 36 def __init__(self, experiment, json_report, using_schedv2=False, log=None, 37 cmd_exec=None): 38 self._experiment = experiment 39 self.l = log or logger.GetLogger(experiment.log_dir) 40 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 41 self._terminated = False 42 self.json_report = json_report 43 self.locked_machines = [] 44 if experiment.log_level != "verbose": 45 self.STATUS_TIME_DELAY = 10 46 47 # Setting this to True will use crosperf sched v2 (feature in progress). 48 self._using_schedv2 = using_schedv2 49 50 def _GetMachineList(self): 51 """Return a list of all requested machines. 52 53 Create a list of all the requested machines, both global requests and 54 label-specific requests, and return the list. 55 """ 56 machines = self._experiment.remote 57 # All Label.remote is a sublist of experiment.remote. 58 for l in self._experiment.labels: 59 for r in l.remote: 60 assert r in machines 61 return machines 62 63 def _UpdateMachineList(self, locked_machines): 64 """Update machines lists to contain only locked machines. 65 66 Go through all the lists of requested machines, both global and 67 label-specific requests, and remove any machine that we were not 68 able to lock. 69 70 Args: 71 locked_machines: A list of the machines we successfully locked. 72 """ 73 for m in self._experiment.remote: 74 if m not in locked_machines: 75 self._experiment.remote.remove(m) 76 77 for l in self._experiment.labels: 78 for m in l.remote: 79 if m not in locked_machines: 80 l.remote.remove(m) 81 82 def _LockAllMachines(self, experiment): 83 """Attempt to globally lock all of the machines requested for run. 84 85 This method will use the AFE server to globally lock all of the machines 86 requested for this crosperf run, to prevent any other crosperf runs from 87 being able to update/use the machines while this experiment is running. 88 """ 89 lock_mgr = afe_lock_machine.AFELockManager( 90 self._GetMachineList(), 91 "", 92 experiment.labels[0].chromeos_root, 93 None, 94 log=self.l, 95 ) 96 for m in lock_mgr.machines: 97 if not lock_mgr.MachineIsKnown(m): 98 lock_mgr.AddLocalMachine(m) 99 machine_states = lock_mgr.GetMachineStates("lock") 100 lock_mgr.CheckMachineLocks(machine_states, "lock") 101 self.locked_machines = lock_mgr.UpdateMachines(True) 102 self._experiment.locked_machines = self.locked_machines 103 self._UpdateMachineList(self.locked_machines) 104 self._experiment.machine_manager.RemoveNonLockedMachines( 105 self.locked_machines) 106 if len(self.locked_machines) == 0: 107 raise RuntimeError("Unable to lock any machines.") 108 109 def _UnlockAllMachines(self, experiment): 110 """Attempt to globally unlock all of the machines requested for run. 111 112 The method will use the AFE server to globally unlock all of the machines 113 requested for this crosperf run. 114 """ 115 if not self.locked_machines: 116 return 117 118 lock_mgr = afe_lock_machine.AFELockManager( 119 self.locked_machines, 120 "", 121 experiment.labels[0].chromeos_root, 122 None, 123 log=self.l, 124 ) 125 machine_states = lock_mgr.GetMachineStates("unlock") 126 lock_mgr.CheckMachineLocks(machine_states, "unlock") 127 lock_mgr.UpdateMachines(False) 128 129 def _ClearCacheEntries(self, experiment): 130 for br in experiment.benchmark_runs: 131 cache = ResultsCache() 132 cache.Init (br.label.chromeos_image, br.label.chromeos_root, 133 br.benchmark.test_name, br.iteration, br.test_args, 134 br.profiler_args, br.machine_manager, br.machine, 135 br.label.board, br.cache_conditions, br._logger, br.log_level, 136 br.label, br.share_cache, br.benchmark.suite, 137 br.benchmark.show_all_results, br.benchmark.run_local) 138 cache_dir = cache._GetCacheDirForWrite() 139 if os.path.exists(cache_dir): 140 self.l.LogOutput("Removing cache dir: %s" % cache_dir) 141 shutil.rmtree(cache_dir) 142 143 def _Run(self, experiment): 144 try: 145 if not experiment.locks_dir: 146 self._LockAllMachines(experiment) 147 if self._using_schedv2: 148 schedv2 = Schedv2(experiment) 149 experiment.set_schedv2(schedv2) 150 if CacheConditions.FALSE in experiment.cache_conditions: 151 self._ClearCacheEntries(experiment) 152 status = ExperimentStatus(experiment) 153 experiment.Run() 154 last_status_time = 0 155 last_status_string = "" 156 try: 157 if experiment.log_level != "verbose": 158 self.l.LogStartDots() 159 while not experiment.IsComplete(): 160 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 161 last_status_time = time.time() 162 border = "==============================" 163 if experiment.log_level == "verbose": 164 self.l.LogOutput(border) 165 self.l.LogOutput(status.GetProgressString()) 166 self.l.LogOutput(status.GetStatusString()) 167 self.l.LogOutput(border) 168 else: 169 current_status_string = status.GetStatusString() 170 if (current_status_string != last_status_string): 171 self.l.LogEndDots() 172 self.l.LogOutput(border) 173 self.l.LogOutput(current_status_string) 174 self.l.LogOutput(border) 175 last_status_string = current_status_string 176 else: 177 self.l.LogAppendDot() 178 time.sleep(self.THREAD_MONITOR_DELAY) 179 except KeyboardInterrupt: 180 self._terminated = True 181 self.l.LogError("Ctrl-c pressed. Cleaning up...") 182 experiment.Terminate() 183 raise 184 except SystemExit: 185 self._terminate = True 186 self.l.LogError("Unexpected exit. Cleaning up...") 187 experiment.Terminate() 188 raise 189 finally: 190 if not experiment.locks_dir: 191 self._UnlockAllMachines(experiment) 192 193 def _PrintTable(self, experiment): 194 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 195 196 def _Email(self, experiment): 197 # Only email by default if a new run was completed. 198 send_mail = False 199 for benchmark_run in experiment.benchmark_runs: 200 if not benchmark_run.cache_hit: 201 send_mail = True 202 break 203 if (not send_mail and not experiment.email_to 204 or config.GetConfig("no_email")): 205 return 206 207 label_names = [] 208 for label in experiment.labels: 209 label_names.append(label.name) 210 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 211 212 text_report = TextResultsReport(experiment, True).GetReport() 213 text_report += ("\nResults are stored in %s.\n" % 214 experiment.results_directory) 215 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 216 html_report = HTMLResultsReport(experiment).GetReport() 217 attachment = EmailSender.Attachment("report.html", html_report) 218 email_to = [getpass.getuser()] + experiment.email_to 219 EmailSender().SendEmail(email_to, 220 subject, 221 text_report, 222 attachments=[attachment], 223 msg_type="html") 224 225 def _StoreResults (self, experiment): 226 if self._terminated: 227 return 228 results_directory = experiment.results_directory 229 FileUtils().RmDir(results_directory) 230 FileUtils().MkDirP(results_directory) 231 self.l.LogOutput("Storing experiment file in %s." % results_directory) 232 experiment_file_path = os.path.join(results_directory, 233 "experiment.exp") 234 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 235 236 self.l.LogOutput("Storing results report in %s." % results_directory) 237 results_table_path = os.path.join(results_directory, "results.html") 238 report = HTMLResultsReport(experiment).GetReport() 239 if self.json_report: 240 JSONResultsReport(experiment).GetReport(results_directory) 241 FileUtils().WriteFile(results_table_path, report) 242 243 self.l.LogOutput("Storing email message body in %s." % results_directory) 244 msg_file_path = os.path.join(results_directory, "msg_body.html") 245 text_report = TextResultsReport(experiment, True).GetReport() 246 text_report += ("\nResults are stored in %s.\n" % 247 experiment.results_directory) 248 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 249 FileUtils().WriteFile(msg_file_path, msg_body) 250 251 self.l.LogOutput("Storing results of each benchmark run.") 252 for benchmark_run in experiment.benchmark_runs: 253 if benchmark_run.result: 254 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 255 benchmark_run_path = os.path.join(results_directory, 256 benchmark_run_name) 257 benchmark_run.result.CopyResultsTo(benchmark_run_path) 258 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 259 260 def Run(self): 261 try: 262 self._Run(self._experiment) 263 finally: 264 # Always print the report at the end of the run. 265 self._PrintTable(self._experiment) 266 if not self._terminated: 267 self._StoreResults(self._experiment) 268 self._Email(self._experiment) 269 270 271class MockExperimentRunner(ExperimentRunner): 272 """Mocked ExperimentRunner for testing.""" 273 274 def __init__(self, experiment): 275 super(MockExperimentRunner, self).__init__(experiment) 276 277 def _Run(self, experiment): 278 self.l.LogOutput("Would run the following experiment: '%s'." % 279 experiment.name) 280 281 def _PrintTable(self, experiment): 282 self.l.LogOutput("Would print the experiment table.") 283 284 def _Email(self, experiment): 285 self.l.LogOutput("Would send result email.") 286 287 def _StoreResults(self, experiment): 288 self.l.LogOutput("Would store the results.") 289