experiment.py revision 51d7a9b5663f6d3067baf06e2fc266265937f61f
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""The experiment setting module.""" 8 9import os 10import time 11 12import afe_lock_machine 13from threading import Lock 14 15from utils import logger 16from utils import misc 17 18import benchmark_run 19from machine_manager import BadChecksum 20from machine_manager import MachineManager 21from machine_manager import MockMachineManager 22import test_flag 23 24class Experiment(object): 25 """Class representing an Experiment to be run.""" 26 27 def __init__(self, name, remote, working_directory, 28 chromeos_root, cache_conditions, labels, benchmarks, 29 experiment_file, email_to, acquire_timeout, log_dir, 30 log_level, share_cache, results_directory, locks_directory): 31 self.name = name 32 self.working_directory = working_directory 33 self.remote = remote 34 self.chromeos_root = chromeos_root 35 self.cache_conditions = cache_conditions 36 self.experiment_file = experiment_file 37 self.email_to = email_to 38 if not results_directory: 39 self.results_directory = os.path.join(self.working_directory, 40 self.name + "_results") 41 else: 42 self.results_directory = misc.CanonicalizePath(results_directory) 43 self.log_dir = log_dir 44 self.log_level = log_level 45 self.labels = labels 46 self.benchmarks = benchmarks 47 self.num_complete = 0 48 self.num_run_complete = 0 49 self.share_cache = share_cache 50 # If locks_directory (self.lock_dir) not blank, we will use the file 51 # locking mechanism; if it is blank then we will use the AFE server 52 # locking mechanism. 53 self.locks_dir = locks_directory 54 self.locked_machines = [] 55 56 if not remote: 57 raise RuntimeError("No remote hosts specified") 58 if not self.benchmarks: 59 raise RuntimeError("No benchmarks specified") 60 if not self.labels: 61 raise RuntimeError("No labels specified") 62 63 # We need one chromeos_root to run the benchmarks in, but it doesn't 64 # matter where it is, unless the ABIs are different. 65 if not chromeos_root: 66 for label in self.labels: 67 if label.chromeos_root: 68 chromeos_root = label.chromeos_root 69 break 70 if not chromeos_root: 71 raise RuntimeError("No chromeos_root given and could not determine " 72 "one from the image path.") 73 74 if test_flag.GetTestMode(): 75 self.machine_manager = MockMachineManager(chromeos_root, acquire_timeout, 76 log_level, locks_directory) 77 else: 78 self.machine_manager = MachineManager(chromeos_root, acquire_timeout, 79 log_level, locks_directory) 80 self.l = logger.GetLogger(log_dir) 81 82 for machine in self.remote: 83 # machine_manager.AddMachine only adds reachable machines. 84 self.machine_manager.AddMachine(machine) 85 # Now machine_manager._all_machines contains a list of reachable 86 # machines. This is a subset of self.remote. We make both lists the same. 87 self.remote = [m.name for m in self.machine_manager._all_machines] 88 if not self.remote: 89 raise RuntimeError("No machine available for running experiment.") 90 91 for label in labels: 92 # We filter out label remotes that are not reachable (not in 93 # self.remote). So each label.remote is a sublist of experiment.remote. 94 label.remote = filter(lambda x: x in self.remote, label.remote) 95 try: 96 self.machine_manager.ComputeCommonCheckSum(label) 97 except BadChecksum: 98 # Force same image on all machines, then we do checksum again. No 99 # bailout if checksums still do not match. 100 self.machine_manager.ForceSameImageToAllMachines(label) 101 self.machine_manager.ComputeCommonCheckSum(label) 102 103 self.machine_manager.ComputeCommonCheckSumString(label) 104 105 self.start_time = None 106 self.benchmark_runs = self._GenerateBenchmarkRuns() 107 108 self._schedv2 = None 109 self._internal_counter_lock = Lock() 110 111 def set_schedv2(self, schedv2): 112 self._schedv2 = schedv2 113 114 def schedv2(self): 115 return self._schedv2 116 117 def _GenerateBenchmarkRuns(self): 118 """Generate benchmark runs from labels and benchmark defintions.""" 119 benchmark_runs = [] 120 for label in self.labels: 121 for benchmark in self.benchmarks: 122 for iteration in range(1, benchmark.iterations + 1): 123 124 benchmark_run_name = "%s: %s (%s)" % (label.name, benchmark.name, 125 iteration) 126 full_name = "%s_%s_%s" % (label.name, benchmark.name, iteration) 127 logger_to_use = logger.Logger(self.log_dir, 128 "run.%s" % (full_name), 129 True) 130 benchmark_runs.append(benchmark_run.BenchmarkRun( 131 benchmark_run_name, 132 benchmark, 133 label, 134 iteration, 135 self.cache_conditions, 136 self.machine_manager, 137 logger_to_use, 138 self.log_level, 139 self.share_cache)) 140 141 return benchmark_runs 142 143 def Build(self): 144 pass 145 146 def Terminate(self): 147 if self._schedv2 is not None: 148 self._schedv2.terminate() 149 else: 150 for t in self.benchmark_runs: 151 if t.isAlive(): 152 self.l.LogError("Terminating run: '%s'." % t.name) 153 t.Terminate() 154 155 def IsComplete(self): 156 if self._schedv2: 157 return self._schedv2.is_complete() 158 if self.active_threads: 159 for t in self.active_threads: 160 if t.isAlive(): 161 t.join(0) 162 if not t.isAlive(): 163 self.num_complete += 1 164 if not t.cache_hit: 165 self.num_run_complete += 1 166 self.active_threads.remove(t) 167 return False 168 return True 169 170 def BenchmarkRunFinished(self, br): 171 """Update internal counters after br finishes. 172 173 Note this is only used by schedv2 and is called by multiple threads. 174 Never throw any exception here. 175 """ 176 177 assert self._schedv2 is not None 178 with self._internal_counter_lock: 179 self.num_complete += 1 180 if not br.cache_hit: 181 self.num_run_complete += 1 182 183 def Run(self): 184 self.start_time = time.time() 185 if self._schedv2 is not None: 186 self._schedv2.run_sched() 187 else: 188 self.active_threads = [] 189 for benchmark_run in self.benchmark_runs: 190 # Set threads to daemon so program exits when ctrl-c is pressed. 191 benchmark_run.daemon = True 192 benchmark_run.start() 193 self.active_threads.append(benchmark_run) 194 195 def SetCacheConditions(self, cache_conditions): 196 for benchmark_run in self.benchmark_runs: 197 benchmark_run.SetCacheConditions(cache_conditions) 198 199 def Cleanup(self): 200 """Make sure all machines are unlocked.""" 201 if self.locks_dir: 202 # We are using the file locks mechanism, so call machine_manager.Cleanup 203 # to unlock everything. 204 self.machine_manager.Cleanup() 205 else: 206 all_machines = self.locked_machines 207 if not all_machines: 208 return 209 210 # If we locked any machines earlier, make sure we unlock them now. 211 lock_mgr = afe_lock_machine.AFELockManager(all_machines, "", 212 self.labels[0].chromeos_root, 213 None) 214 machine_states = lock_mgr.GetMachineStates("unlock") 215 for k, state in machine_states.iteritems(): 216 if state["locked"]: 217 lock_mgr.UpdateLockInAFE(False, k) 218