experiment_runner.py revision f9b503542669e34f83e5f6bfd3c4e19a0c43e015
1#!/usr/bin/python 2 3# Copyright 2011-2015 Google Inc. All Rights Reserved. 4 5"""The experiment runner module.""" 6import getpass 7import os 8import random 9import shutil 10import sys 11import time 12 13import afe_lock_machine 14 15from utils import command_executer 16from utils import logger 17from utils.email_sender import EmailSender 18from utils.file_utils import FileUtils 19 20import config 21from experiment_status import ExperimentStatus 22from results_cache import CacheConditions 23from results_cache import ResultsCache 24from results_report import HTMLResultsReport 25from results_report import TextResultsReport 26from results_report import JSONResultsReport 27from schedv2 import Schedv2 28 29 30class ExperimentRunner(object): 31 """ExperimentRunner Class.""" 32 33 STATUS_TIME_DELAY = 30 34 THREAD_MONITOR_DELAY = 2 35 36 def __init__(self, experiment, json_report, using_schedv2=False, log=None, 37 cmd_exec=None): 38 self._experiment = experiment 39 self.l = log or logger.GetLogger(experiment.log_dir) 40 self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) 41 self._terminated = False 42 self.json_report = json_report 43 self.locked_machines = [] 44 if experiment.log_level != "verbose": 45 self.STATUS_TIME_DELAY = 10 46 47 # Setting this to True will use crosperf sched v2 (feature in progress). 48 self._using_schedv2 = using_schedv2 49 50 def _GetMachineList(self): 51 """Return a list of all requested machines. 52 53 Create a list of all the requested machines, both global requests and 54 label-specific requests, and return the list. 55 """ 56 machines = self._experiment.remote 57 # All Label.remote is a sublist of experiment.remote. 58 for l in self._experiment.labels: 59 for r in l.remote: 60 assert r in machines 61 return machines 62 63 def _UpdateMachineList(self, locked_machines): 64 """Update machines lists to contain only locked machines. 65 66 Go through all the lists of requested machines, both global and 67 label-specific requests, and remove any machine that we were not 68 able to lock. 69 70 Args: 71 locked_machines: A list of the machines we successfully locked. 72 """ 73 for m in self._experiment.remote: 74 if m not in locked_machines: 75 self._experiment.remote.remove(m) 76 77 for l in self._experiment.labels: 78 for m in l.remote: 79 if m not in locked_machines: 80 l.remote.remove(m) 81 82 def _LockAllMachines(self, experiment): 83 """Attempt to globally lock all of the machines requested for run. 84 85 This method will use the AFE server to globally lock all of the machines 86 requested for this crosperf run, to prevent any other crosperf runs from 87 being able to update/use the machines while this experiment is running. 88 """ 89 lock_mgr = afe_lock_machine.AFELockManager( 90 self._GetMachineList(), 91 "", 92 experiment.labels[0].chromeos_root, 93 None, 94 log=self.l, 95 ) 96 for m in lock_mgr.machines: 97 if not lock_mgr.MachineIsKnown(m): 98 lock_mgr.AddLocalMachine(m) 99 machine_states = lock_mgr.GetMachineStates("lock") 100 lock_mgr.CheckMachineLocks(machine_states, "lock") 101 self.locked_machines = lock_mgr.UpdateMachines(True) 102 self._experiment.locked_machines = self.locked_machines 103 self._UpdateMachineList(self.locked_machines) 104 self._experiment.machine_manager.RemoveNonLockedMachines( 105 self.locked_machines) 106 if len(self.locked_machines) == 0: 107 raise RuntimeError("Unable to lock any machines.") 108 109 def _UnlockAllMachines(self, experiment): 110 """Attempt to globally unlock all of the machines requested for run. 111 112 The method will use the AFE server to globally unlock all of the machines 113 requested for this crosperf run. 114 """ 115 if not self.locked_machines: 116 return 117 118 lock_mgr = afe_lock_machine.AFELockManager( 119 self.locked_machines, 120 "", 121 experiment.labels[0].chromeos_root, 122 None, 123 log=self.l, 124 ) 125 machine_states = lock_mgr.GetMachineStates("unlock") 126 lock_mgr.CheckMachineLocks(machine_states, "unlock") 127 lock_mgr.UpdateMachines(False) 128 129 def _ClearCacheEntries(self, experiment): 130 for br in experiment.benchmark_runs: 131 cache = ResultsCache() 132 cache.Init (br.label.chromeos_image, br.label.chromeos_root, 133 br.benchmark.test_name, br.iteration, br.test_args, 134 br.profiler_args, br.machine_manager, br.machine, 135 br.label.board, br.cache_conditions, br._logger, br.log_level, 136 br.label, br.share_cache, br.benchmark.suite, 137 br.benchmark.show_all_results, br.benchmark.run_local) 138 cache_dir = cache._GetCacheDirForWrite() 139 if os.path.exists(cache_dir): 140 self.l.LogOutput("Removing cache dir: %s" % cache_dir) 141 shutil.rmtree(cache_dir) 142 143 def _Run(self, experiment): 144 try: 145 if not experiment.locks_dir: 146 self._LockAllMachines(experiment) 147 if self._using_schedv2: 148 schedv2 = Schedv2(experiment) 149 experiment.set_schedv2(schedv2) 150 if CacheConditions.FALSE in experiment.cache_conditions: 151 self._ClearCacheEntries(experiment) 152 status = ExperimentStatus(experiment) 153 experiment.Run() 154 last_status_time = 0 155 last_status_string = "" 156 try: 157 if experiment.log_level != "verbose": 158 self.l.LogStartDots() 159 while not experiment.IsComplete(): 160 if last_status_time + self.STATUS_TIME_DELAY < time.time(): 161 last_status_time = time.time() 162 border = "==============================" 163 if experiment.log_level == "verbose": 164 self.l.LogOutput(border) 165 self.l.LogOutput(status.GetProgressString()) 166 self.l.LogOutput(status.GetStatusString()) 167 self.l.LogOutput(border) 168 else: 169 current_status_string = status.GetStatusString() 170 if (current_status_string != last_status_string): 171 self.l.LogEndDots() 172 self.l.LogOutput(border) 173 self.l.LogOutput(current_status_string) 174 self.l.LogOutput(border) 175 last_status_string = current_status_string 176 else: 177 self.l.LogAppendDot() 178 time.sleep(self.THREAD_MONITOR_DELAY) 179 except KeyboardInterrupt: 180 self._terminated = True 181 self.l.LogError("Ctrl-c pressed. Cleaning up...") 182 experiment.Terminate() 183 finally: 184 if not experiment.locks_dir: 185 self._UnlockAllMachines(experiment) 186 187 def _PrintTable(self, experiment): 188 self.l.LogOutput(TextResultsReport(experiment).GetReport()) 189 190 def _Email(self, experiment): 191 # Only email by default if a new run was completed. 192 send_mail = False 193 for benchmark_run in experiment.benchmark_runs: 194 if not benchmark_run.cache_hit: 195 send_mail = True 196 break 197 if (not send_mail and not experiment.email_to 198 or config.GetConfig("no_email")): 199 return 200 201 label_names = [] 202 for label in experiment.labels: 203 label_names.append(label.name) 204 subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) 205 206 text_report = TextResultsReport(experiment, True).GetReport() 207 text_report += ("\nResults are stored in %s.\n" % 208 experiment.results_directory) 209 text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report 210 html_report = HTMLResultsReport(experiment).GetReport() 211 attachment = EmailSender.Attachment("report.html", html_report) 212 email_to = [getpass.getuser()] + experiment.email_to 213 EmailSender().SendEmail(email_to, 214 subject, 215 text_report, 216 attachments=[attachment], 217 msg_type="html") 218 219 def _StoreResults (self, experiment): 220 if self._terminated: 221 return 222 results_directory = experiment.results_directory 223 FileUtils().RmDir(results_directory) 224 FileUtils().MkDirP(results_directory) 225 self.l.LogOutput("Storing experiment file in %s." % results_directory) 226 experiment_file_path = os.path.join(results_directory, 227 "experiment.exp") 228 FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) 229 230 self.l.LogOutput("Storing results report in %s." % results_directory) 231 results_table_path = os.path.join(results_directory, "results.html") 232 report = HTMLResultsReport(experiment).GetReport() 233 if self.json_report: 234 JSONResultsReport(experiment).GetReport(results_directory) 235 FileUtils().WriteFile(results_table_path, report) 236 237 self.l.LogOutput("Storing email message body in %s." % results_directory) 238 msg_file_path = os.path.join(results_directory, "msg_body.html") 239 text_report = TextResultsReport(experiment, True).GetReport() 240 text_report += ("\nResults are stored in %s.\n" % 241 experiment.results_directory) 242 msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report 243 FileUtils().WriteFile(msg_file_path, msg_body) 244 245 self.l.LogOutput("Storing results of each benchmark run.") 246 for benchmark_run in experiment.benchmark_runs: 247 if benchmark_run.result: 248 benchmark_run_name = filter(str.isalnum, benchmark_run.name) 249 benchmark_run_path = os.path.join(results_directory, 250 benchmark_run_name) 251 benchmark_run.result.CopyResultsTo(benchmark_run_path) 252 benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) 253 254 def Run(self): 255 self._Run(self._experiment) 256 self._PrintTable(self._experiment) 257 if not self._terminated: 258 self._StoreResults(self._experiment) 259 self._Email(self._experiment) 260 261 262class MockExperimentRunner(ExperimentRunner): 263 """Mocked ExperimentRunner for testing.""" 264 265 def __init__(self, experiment): 266 super(MockExperimentRunner, self).__init__(experiment) 267 268 def _Run(self, experiment): 269 self.l.LogOutput("Would run the following experiment: '%s'." % 270 experiment.name) 271 272 def _PrintTable(self, experiment): 273 self.l.LogOutput("Would print the experiment table.") 274 275 def _Email(self, experiment): 276 self.l.LogOutput("Would send result email.") 277 278 def _StoreResults(self, experiment): 279 self.l.LogOutput("Would store the results.") 280