schedv2.py revision eddb06396dc17fcec418c3aadc234f737b63c876
1 2# Copyright 2015 Google Inc. All Rights Reserved. 3"""Module to optimize the scheduling of benchmark_run tasks.""" 4 5 6from __future__ import print_function 7 8import sys 9import test_flag 10import traceback 11 12from collections import defaultdict 13from machine_image_manager import MachineImageManager 14from threading import Lock 15from threading import Thread 16from cros_utils import command_executer 17from cros_utils import logger 18 19 20class DutWorker(Thread): 21 """Working thread for a dut.""" 22 23 def __init__(self, dut, sched): 24 super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name)) 25 self._dut = dut 26 self._sched = sched 27 self._stat_num_br_run = 0 28 self._stat_num_reimage = 0 29 self._stat_annotation = '' 30 self._logger = logger.GetLogger(self._sched.get_experiment().log_dir) 31 self.daemon = True 32 self._terminated = False 33 self._active_br = None 34 # Race condition accessing _active_br between _execute_benchmark_run and 35 # _terminate, so lock it up. 36 self._active_br_lock = Lock() 37 38 def terminate(self): 39 self._terminated = True 40 with self._active_br_lock: 41 if self._active_br is not None: 42 # BenchmarkRun.Terminate() terminates any running testcase via 43 # suite_runner.Terminate and updates timeline. 44 self._active_br.Terminate() 45 46 def run(self): 47 """Do the "run-test->(optionally reimage)->run-test" chore. 48 49 Note - 'br' below means 'benchmark_run'. 50 """ 51 52 # Firstly, handle benchmarkruns that have cache hit. 53 br = self._sched.get_cached_benchmark_run() 54 while br: 55 try: 56 self._stat_annotation = 'finishing cached {}'.format(br) 57 br.run() 58 except RuntimeError: 59 traceback.print_exc(file=sys.stdout) 60 br = self._sched.get_cached_benchmark_run() 61 62 # Secondly, handle benchmarkruns that needs to be run on dut. 63 self._setup_dut_label() 64 try: 65 self._logger.LogOutput('{} started.'.format(self)) 66 while not self._terminated: 67 br = self._sched.get_benchmark_run(self._dut) 68 if br is None: 69 # No br left for this label. Considering reimaging. 70 label = self._sched.allocate_label(self._dut) 71 if label is None: 72 # No br even for other labels. We are done. 73 self._logger.LogOutput('ImageManager found no label ' 74 'for dut, stopping working ' 75 'thread {}.'.format(self)) 76 break 77 if self._reimage(label): 78 # Reimage to run other br fails, dut is doomed, stop 79 # this thread. 80 self._logger.LogWarning('Re-image failed, dut ' 81 'in an unstable state, stopping ' 82 'working thread {}.'.format(self)) 83 break 84 else: 85 # Execute the br. 86 self._execute_benchmark_run(br) 87 finally: 88 self._stat_annotation = 'finished' 89 # Thread finishes. Notify scheduler that I'm done. 90 self._sched.dut_worker_finished(self) 91 92 def _reimage(self, label): 93 """Reimage image to label. 94 95 Args: 96 label: the label to remimage onto dut. 97 98 Returns: 99 0 if successful, otherwise 1. 100 """ 101 102 # Termination could happen anywhere, check it. 103 if self._terminated: 104 return 1 105 106 self._logger.LogOutput('Reimaging {} using {}'.format(self, label)) 107 self._stat_num_reimage += 1 108 self._stat_annotation = 'reimaging using "{}"'.format(label.name) 109 try: 110 # Note, only 1 reimage at any given time, this is guaranteed in 111 # ImageMachine, so no sync needed below. 112 retval = self._sched.get_experiment().machine_manager.ImageMachine( 113 self._dut, 114 label) 115 116 if retval: 117 return 1 118 except RuntimeError: 119 return 1 120 121 self._dut.label = label 122 return 0 123 124 def _execute_benchmark_run(self, br): 125 """Execute a single benchmark_run. 126 127 Note - this function never throws exceptions. 128 """ 129 130 # Termination could happen anywhere, check it. 131 if self._terminated: 132 return 133 134 self._logger.LogOutput('{} started working on {}'.format(self, br)) 135 self._stat_num_br_run += 1 136 self._stat_annotation = 'executing {}'.format(br) 137 # benchmark_run.run does not throws, but just play it safe here. 138 try: 139 assert br.owner_thread is None 140 br.owner_thread = self 141 with self._active_br_lock: 142 self._active_br = br 143 br.run() 144 finally: 145 self._sched.get_experiment().BenchmarkRunFinished(br) 146 with self._active_br_lock: 147 self._active_br = None 148 149 def _setup_dut_label(self): 150 """Try to match dut image with a certain experiment label. 151 152 If such match is found, we just skip doing reimage and jump to execute 153 some benchmark_runs. 154 """ 155 156 checksum_file = '/usr/local/osimage_checksum_file' 157 try: 158 rv, checksum, _ = command_executer.GetCommandExecuter().\ 159 CrosRunCommandWOutput( 160 'cat ' + checksum_file, 161 chromeos_root=self._sched.get_labels(0).chromeos_root, 162 machine=self._dut.name, 163 print_to_console=False) 164 if rv == 0: 165 checksum = checksum.strip() 166 for l in self._sched.get_labels(): 167 if l.checksum == checksum: 168 self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format( 169 self._dut.name, l)) 170 self._dut.label = l 171 return 172 except RuntimeError: 173 traceback.print_exc(file=sys.stdout) 174 self._dut.label = None 175 176 def __str__(self): 177 return 'DutWorker[dut="{}", label="{}"]'.format( 178 self._dut.name, self._dut.label.name if self._dut.label else 'None') 179 180 def dut(self): 181 return self._dut 182 183 def status_str(self): 184 """Report thread status.""" 185 186 return ('Worker thread "{}", label="{}", benchmark_run={}, ' 187 'reimage={}, now {}'.format( 188 self._dut.name, 'None' if self._dut.label is None else 189 self._dut.label.name, self._stat_num_br_run, 190 self._stat_num_reimage, self._stat_annotation)) 191 192 193class BenchmarkRunCacheReader(Thread): 194 """The thread to read cache for a list of benchmark_runs. 195 196 On creation, each instance of this class is given a br_list, which is a 197 subset of experiment._benchmark_runs. 198 """ 199 200 def __init__(self, schedv2, br_list): 201 super(BenchmarkRunCacheReader, self).__init__() 202 self._schedv2 = schedv2 203 self._br_list = br_list 204 self._logger = self._schedv2.get_logger() 205 206 def run(self): 207 for br in self._br_list: 208 try: 209 br.ReadCache() 210 if br.cache_hit: 211 self._logger.LogOutput('Cache hit - {}'.format(br)) 212 with self._schedv2.lock_on('_cached_br_list'): 213 self._schedv2.get_cached_run_list().append(br) 214 else: 215 self._logger.LogOutput('Cache not hit - {}'.format(br)) 216 except RuntimeError: 217 traceback.print_exc(file=sys.stderr) 218 219 220class Schedv2(object): 221 """New scheduler for crosperf.""" 222 223 def __init__(self, experiment): 224 self._experiment = experiment 225 self._logger = logger.GetLogger(experiment.log_dir) 226 227 # Create shortcuts to nested data structure. "_duts" points to a list of 228 # locked machines. _labels points to a list of all labels. 229 self._duts = self._experiment.machine_manager.GetMachines() 230 self._labels = self._experiment.labels 231 232 # Bookkeeping for synchronization. 233 self._workers_lock = Lock() 234 # pylint: disable=unnecessary-lambda 235 self._lock_map = defaultdict(lambda: Lock()) 236 237 # Test mode flag 238 self._in_test_mode = test_flag.GetTestMode() 239 240 # Read benchmarkrun cache. 241 self._read_br_cache() 242 243 # Mapping from label to a list of benchmark_runs. 244 self._label_brl_map = dict([(l, []) for l in self._labels]) 245 for br in self._experiment.benchmark_runs: 246 assert br.label in self._label_brl_map 247 # Only put no-cache-hit br into the map. 248 if br not in self._cached_br_list: 249 self._label_brl_map[br.label].append(br) 250 251 # Use machine image manager to calculate initial label allocation. 252 self._mim = MachineImageManager(self._labels, self._duts) 253 self._mim.compute_initial_allocation() 254 255 # Create worker thread, 1 per dut. 256 self._active_workers = [DutWorker(dut, self) for dut in self._duts] 257 self._finished_workers = [] 258 259 # Termination flag. 260 self._terminated = False 261 262 def run_sched(self): 263 """Start all dut worker threads and return immediately.""" 264 265 _ = [w.start() for w in self._active_workers] 266 267 def _read_br_cache(self): 268 """Use multi-threading to read cache for all benchmarkruns. 269 270 We do this by firstly creating a few threads, and then assign each 271 thread a segment of all brs. Each thread will check cache status for 272 each br and put those with cache into '_cached_br_list'. 273 """ 274 275 self._cached_br_list = [] 276 n_benchmarkruns = len(self._experiment.benchmark_runs) 277 if n_benchmarkruns <= 4: 278 # Use single thread to read cache. 279 self._logger.LogOutput(('Starting to read cache status for ' 280 '{} benchmark runs ...').format(n_benchmarkruns)) 281 BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run() 282 return 283 284 # Split benchmarkruns set into segments. Each segment will be handled by 285 # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4). 286 n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4)) 287 self._logger.LogOutput(('Starting {} threads to read cache status for ' 288 '{} benchmark runs ...').format(n_threads, 289 n_benchmarkruns)) 290 benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads 291 benchmarkrun_segments = [] 292 for i in range(n_threads - 1): 293 start = i * benchmarkruns_per_thread 294 end = (i + 1) * benchmarkruns_per_thread 295 benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end]) 296 benchmarkrun_segments.append(self._experiment.benchmark_runs[ 297 (n_threads - 1) * benchmarkruns_per_thread:]) 298 299 # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs. 300 assert sum([len(x) for x in benchmarkrun_segments]) == n_benchmarkruns 301 302 # Create and start all readers. 303 cache_readers = [ 304 BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments 305 ] 306 307 for x in cache_readers: 308 x.start() 309 310 # Wait till all readers finish. 311 for x in cache_readers: 312 x.join() 313 314 # Summarize. 315 self._logger.LogOutput( 316 'Total {} cache hit out of {} benchmark_runs.'.format( 317 len(self._cached_br_list), n_benchmarkruns)) 318 319 def get_cached_run_list(self): 320 return self._cached_br_list 321 322 def get_label_map(self): 323 return self._label_brl_map 324 325 def get_experiment(self): 326 return self._experiment 327 328 def get_labels(self, i=None): 329 if i == None: 330 return self._labels 331 return self._labels[i] 332 333 def get_logger(self): 334 return self._logger 335 336 def get_cached_benchmark_run(self): 337 """Get a benchmark_run with 'cache hit'. 338 339 Returns: 340 The benchmark that has cache hit, if any. Otherwise none. 341 """ 342 343 with self.lock_on('_cached_br_list'): 344 if self._cached_br_list: 345 return self._cached_br_list.pop() 346 return None 347 348 def get_benchmark_run(self, dut): 349 """Get a benchmark_run (br) object for a certain dut. 350 351 Args: 352 dut: the dut for which a br is returned. 353 354 Returns: 355 A br with its label matching that of the dut. If no such br could be 356 found, return None (this usually means a reimage is required for the 357 dut). 358 """ 359 360 # If terminated, stop providing any br. 361 if self._terminated: 362 return None 363 364 # If dut bears an unrecognized label, return None. 365 if dut.label is None: 366 return None 367 368 # If br list for the dut's label is empty (that means all brs for this 369 # label have been done), return None. 370 with self.lock_on(dut.label): 371 brl = self._label_brl_map[dut.label] 372 if not brl: 373 return None 374 # Return the first br. 375 return brl.pop(0) 376 377 def allocate_label(self, dut): 378 """Allocate a label to a dut. 379 380 The work is delegated to MachineImageManager. 381 382 The dut_worker calling this method is responsible for reimage the dut to 383 this label. 384 385 Args: 386 dut: the new label that is to be reimaged onto the dut. 387 388 Returns: 389 The label or None. 390 """ 391 392 if self._terminated: 393 return None 394 395 return self._mim.allocate(dut, self) 396 397 def dut_worker_finished(self, dut_worker): 398 """Notify schedv2 that the dut_worker thread finished. 399 400 Args: 401 dut_worker: the thread that is about to end. 402 """ 403 404 self._logger.LogOutput('{} finished.'.format(dut_worker)) 405 with self._workers_lock: 406 self._active_workers.remove(dut_worker) 407 self._finished_workers.append(dut_worker) 408 409 def is_complete(self): 410 return len(self._active_workers) == 0 411 412 def lock_on(self, my_object): 413 return self._lock_map[my_object] 414 415 def terminate(self): 416 """Mark flag so we stop providing br/reimages. 417 418 Also terminate each DutWorker, so they refuse to execute br or reimage. 419 """ 420 421 self._terminated = True 422 for dut_worker in self._active_workers: 423 dut_worker.terminate() 424 425 def threads_status_as_string(self): 426 """Report the dut worker threads status.""" 427 428 status = '{} active threads, {} finished threads.\n'.format( 429 len(self._active_workers), len(self._finished_workers)) 430 status += ' Active threads:' 431 for dw in self._active_workers: 432 status += '\n ' + dw.status_str() 433 if self._finished_workers: 434 status += '\n Finished threads:' 435 for dw in self._finished_workers: 436 status += '\n ' + dw.status_str() 437 return status 438