machine_manager.py revision c152cdb3fa9aca8e1c9c9699913ca17d50f1e42b
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import file_lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17 18from utils import command_executer 19from utils import logger 20from utils import misc 21from utils.file_utils import FileUtils 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25class BadChecksum(Exception): 26 """Raised if all machines for a label don't have the same checksum.""" 27 pass 28 29class BadChecksumString(Exception): 30 """Raised if all machines for a label don't have the same checksum string.""" 31 pass 32 33class MissingLocksDirectory(Exception): 34 """Raised when cannot find/access the machine locks directory.""" 35 36class CrosCommandError(Exception): 37 """Raised when an error occurs running command on DUT.""" 38 39class CrosMachine(object): 40 def __init__(self, name, chromeos_root, log_level, cmd_exec=None): 41 self.name = name 42 self.image = None 43 # We relate a dut with a label if we reimage the dut using label or we 44 # detect at the very beginning that the dut is running this label. 45 self.label = None 46 self.checksum = None 47 self.locked = False 48 self.released_time = time.time() 49 self.test_run = None 50 self.chromeos_root = chromeos_root 51 self.log_level = log_level 52 self.ce = cmd_exec or command_executer.GetCommandExecuter( 53 log_level=self.log_level) 54 self.SetUpChecksumInfo() 55 56 def SetUpChecksumInfo(self): 57 if not self.IsReachable(): 58 self.machine_checksum = None 59 return 60 self._GetMemoryInfo() 61 self._GetCPUInfo() 62 self._ComputeMachineChecksumString() 63 self._GetMachineID() 64 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 65 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 66 67 def IsReachable(self): 68 command = "ls" 69 ret = self.ce.CrosRunCommand(command, 70 machine=self.name, 71 chromeos_root=self.chromeos_root) 72 if ret: 73 return False 74 return True 75 76 def _ParseMemoryInfo(self): 77 line = self.meminfo.splitlines()[0] 78 usable_kbytes = int(line.split()[1]) 79 # This code is from src/third_party/test/files/client/bin/base_utils.py 80 # usable_kbytes is system's usable DRAM in kbytes, 81 # as reported by memtotal() from device /proc/meminfo memtotal 82 # after Linux deducts 1.5% to 9.5% for system table overhead 83 # Undo the unknown actual deduction by rounding up 84 # to next small multiple of a big power-of-two 85 # eg 12GB - 5.1% gets rounded back up to 12GB 86 mindeduct = 0.005 # 0.5 percent 87 maxdeduct = 0.095 # 9.5 percent 88 # deduction range 1.5% .. 9.5% supports physical mem sizes 89 # 6GB .. 12GB in steps of .5GB 90 # 12GB .. 24GB in steps of 1 GB 91 # 24GB .. 48GB in steps of 2 GB ... 92 # Finer granularity in physical mem sizes would require 93 # tighter spread between min and max possible deductions 94 95 # increase mem size by at least min deduction, without rounding 96 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 97 # increase mem size further by 2**n rounding, by 0..roundKb or more 98 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 99 # find least binary roundup 2**n that covers worst-cast roundKb 100 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 101 # have round_kbytes <= mod2n < round_kbytes*2 102 # round min_kbytes up to next multiple of mod2n 103 phys_kbytes = min_kbytes + mod2n - 1 104 phys_kbytes -= phys_kbytes % mod2n # clear low bits 105 self.phys_kbytes = phys_kbytes 106 107 def _GetMemoryInfo(self): 108 #TODO yunlian: when the machine in rebooting, it will not return 109 #meminfo, the assert does not catch it either 110 command = "cat /proc/meminfo" 111 ret, self.meminfo, _ = self.ce.CrosRunCommand(command, return_output=True, 112 machine=self.name, 113 chromeos_root=self.chromeos_root) 114 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 115 if ret == 0: 116 self._ParseMemoryInfo() 117 118 #cpuinfo format is different across architecture 119 #need to find a better way to parse it. 120 def _ParseCPUInfo(self, cpuinfo): 121 return 0 122 123 def _GetCPUInfo(self): 124 command = "cat /proc/cpuinfo" 125 ret, self.cpuinfo, _ = self.ce.CrosRunCommand(command, return_output=True, 126 machine=self.name, 127 chromeos_root=self.chromeos_root) 128 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 129 if ret == 0: 130 self._ParseCPUInfo(self.cpuinfo) 131 132 def _ComputeMachineChecksumString(self): 133 self.checksum_string = "" 134 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 135 for line in self.cpuinfo.splitlines(): 136 if not any([e in line for e in exclude_lines_list]): 137 self.checksum_string += line 138 self.checksum_string += " " + str(self.phys_kbytes) 139 140 def _GetMD5Checksum(self, ss): 141 if ss: 142 return hashlib.md5(ss).hexdigest() 143 else: 144 return "" 145 146 def _GetMachineID(self): 147 command = "dump_vpd_log --full --stdout" 148 ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True, 149 machine=self.name, 150 chromeos_root=self.chromeos_root) 151 b = if_out.splitlines() 152 a = [l for l in b if "Product" in l] 153 if len(a): 154 self.machine_id = a[0] 155 return 156 command = "ifconfig" 157 ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True, 158 machine=self.name, 159 chromeos_root=self.chromeos_root) 160 b = if_out.splitlines() 161 a = [l for l in b if "HWaddr" in l] 162 if len(a): 163 self.machine_id = "_".join(a) 164 return 165 a = [l for l in b if "ether" in l] 166 if len(a): 167 self.machine_id = "_".join(a) 168 return 169 assert 0, "Could not get machine_id from machine: %s" % self.name 170 171 def __str__(self): 172 l = [] 173 l.append(self.name) 174 l.append(str(self.image)) 175 l.append(str(self.checksum)) 176 l.append(str(self.locked)) 177 l.append(str(self.released_time)) 178 return ", ".join(l) 179 180 181class MachineManager(object): 182 """Lock, image and unlock machines locally for benchmark runs. 183 184 This class contains methods and calls to lock, unlock and image 185 machines and distribute machines to each benchmark run. The assumption is 186 that all of the machines for the experiment have been globally locked 187 (using an AFE server) in the ExperimentRunner, but the machines still need 188 to be locally locked/unlocked (allocated to benchmark runs) to prevent 189 multiple benchmark runs within the same experiment from trying to use the 190 same machine at the same time. 191 """ 192 def __init__(self, chromeos_root, acquire_timeout, log_level, locks_dir, 193 cmd_exec=None, lgr=None): 194 self._lock = threading.RLock() 195 self._all_machines = [] 196 self._machines = [] 197 self.image_lock = threading.Lock() 198 self.num_reimages = 0 199 self.chromeos_root = None 200 self.machine_checksum = {} 201 self.machine_checksum_string = {} 202 self.acquire_timeout = acquire_timeout 203 self.log_level = log_level 204 self.locks_dir = locks_dir 205 self.ce = cmd_exec or command_executer.GetCommandExecuter( 206 log_level=self.log_level) 207 self.logger = lgr or logger.GetLogger() 208 209 if self.locks_dir and not os.path.isdir(self.locks_dir): 210 raise MissingLocksDirectory("Cannot access locks directory: %s" 211 % self.locks_dir) 212 213 self._initialized_machines = [] 214 self.chromeos_root = chromeos_root 215 216 def RemoveNonLockedMachines(self, locked_machines): 217 for m in self._all_machines: 218 if m.name not in locked_machines: 219 self._all_machines.remove(m) 220 221 for m in self._machines: 222 if m.name not in locked_machines: 223 self._machines.remove(m) 224 225 def GetChromeVersion(self, machine): 226 """Get the version of Chrome running on the DUT.""" 227 228 cmd = "/opt/google/chrome/chrome --version" 229 ret, version, _ = self.ce.CrosRunCommand(cmd, return_output=True, 230 machine=machine.name, 231 chromeos_root=self.chromeos_root) 232 if ret != 0: 233 raise CrosCommandError("Couldn't get Chrome version from %s." 234 % machine.name) 235 236 if ret != 0: 237 version = "" 238 return version.rstrip() 239 240 def ImageMachine(self, machine, label): 241 checksum = label.checksum 242 243 if checksum and (machine.checksum == checksum): 244 return 245 chromeos_root = label.chromeos_root 246 if not chromeos_root: 247 chromeos_root = self.chromeos_root 248 image_chromeos_args = [image_chromeos.__file__, 249 "--no_lock", 250 "--chromeos_root=%s" % chromeos_root, 251 "--image=%s" % label.chromeos_image, 252 "--image_args=%s" % label.image_args, 253 "--remote=%s" % machine.name, 254 "--logging_level=%s" % self.log_level] 255 if label.board: 256 image_chromeos_args.append("--board=%s" % label.board) 257 258 # Currently can't image two machines at once. 259 # So have to serialized on this lock. 260 save_ce_log_level = self.ce.log_level 261 if self.log_level != "verbose": 262 self.ce.log_level = "average" 263 264 with self.image_lock: 265 if self.log_level != "verbose": 266 self.logger.LogOutput("Pushing image onto machine.") 267 self.logger.LogOutput("Running image_chromeos.DoImage with %s" 268 % " ".join(image_chromeos_args)) 269 retval = image_chromeos.DoImage(image_chromeos_args) 270 if retval: 271 cmd = "reboot && exit" 272 if self.log_level != "verbose": 273 self.logger.LogOutput("reboot & exit.") 274 self.ce.CrosRunCommand(cmd, machine=machine.name, 275 chromeos_root=self.chromeos_root) 276 time.sleep(60) 277 if self.log_level != "verbose": 278 self.logger.LogOutput("Pushing image onto machine.") 279 self.logger.LogOutput("Running image_chromeos.DoImage with %s" 280 % " ".join(image_chromeos_args)) 281 retval = image_chromeos.DoImage(image_chromeos_args) 282 if retval: 283 raise Exception("Could not image machine: '%s'." % machine.name) 284 else: 285 self.num_reimages += 1 286 machine.checksum = checksum 287 machine.image = label.chromeos_image 288 machine.label = label 289 290 if not label.chrome_version: 291 label.chrome_version = self.GetChromeVersion(machine) 292 293 self.ce.log_level = save_ce_log_level 294 return retval 295 296 def ComputeCommonCheckSum(self, label): 297 # Since this is used for cache lookups before the machines have been 298 # compared/verified, check here to make sure they all have the same 299 # checksum (otherwise the cache lookup may not be valid). 300 common_checksum = None 301 for machine in self.GetMachines(label): 302 # Make sure the machine's checksums are calculated. 303 if not machine.machine_checksum: 304 machine.SetUpChecksumInfo() 305 cs = machine.machine_checksum 306 # If this is the first machine we've examined, initialize 307 # common_checksum. 308 if not common_checksum: 309 common_checksum = cs 310 # Make sure this machine's checksum matches our 'common' checksum. 311 if cs != common_checksum: 312 raise BadChecksum("Machine checksums do not match!") 313 self.machine_checksum[label.name] = common_checksum 314 315 def ComputeCommonCheckSumString(self, label): 316 # The assumption is that this function is only called AFTER 317 # ComputeCommonCheckSum, so there is no need to verify the machines 318 # are the same here. If this is ever changed, this function should be 319 # modified to verify that all the machines for a given label are the 320 # same. 321 for machine in self.GetMachines(label): 322 if machine.checksum_string: 323 self.machine_checksum_string[label.name] = machine.checksum_string 324 break 325 326 def _TryToLockMachine(self, cros_machine): 327 with self._lock: 328 assert cros_machine, "Machine can't be None" 329 for m in self._machines: 330 if m.name == cros_machine.name: 331 return 332 locked = True 333 if self.locks_dir: 334 locked = file_lock_machine.Machine(cros_machine.name, 335 self.locks_dir).Lock(True, 336 sys.argv[0]) 337 if locked: 338 self._machines.append(cros_machine) 339 command = "cat %s" % CHECKSUM_FILE 340 ret, out, _ = self.ce.CrosRunCommand( 341 command, return_output=True, chromeos_root=self.chromeos_root, 342 machine=cros_machine.name) 343 if ret == 0: 344 cros_machine.checksum = out.strip() 345 elif self.locks_dir: 346 self.logger.LogOutput("Couldn't lock: %s" % cros_machine.name) 347 348 # This is called from single threaded mode. 349 def AddMachine(self, machine_name): 350 with self._lock: 351 for m in self._all_machines: 352 assert m.name != machine_name, "Tried to double-add %s" % machine_name 353 if self.log_level != "verbose": 354 self.logger.LogOutput("Setting up remote access to %s" 355 % machine_name) 356 self.logger.LogOutput("Checking machine characteristics for %s" 357 % machine_name) 358 cm = CrosMachine(machine_name, self.chromeos_root, self.log_level) 359 if cm.machine_checksum: 360 self._all_machines.append(cm) 361 362 363 def RemoveMachine(self, machine_name): 364 with self._lock: 365 self._machines = [m for m in self._machines 366 if m.name != machine_name] 367 if self.locks_dir: 368 res = file_lock_machine.Machine(machine_name, 369 self.locks_dir).Unlock(True) 370 if not res: 371 self.logger.LogError("Could not unlock machine: '%s'." 372 % machine_name) 373 374 def ForceSameImageToAllMachines(self, label): 375 machines = self.GetMachines(label) 376 for m in machines: 377 self.ImageMachine(m, label) 378 m.SetUpChecksumInfo() 379 380 def AcquireMachine(self, chromeos_image, label, throw=False): 381 image_checksum = label.checksum 382 machines = self.GetMachines(label) 383 check_interval_time = 120 384 with self._lock: 385 # Lazily external lock machines 386 while self.acquire_timeout >= 0: 387 for m in machines: 388 new_machine = m not in self._all_machines 389 self._TryToLockMachine(m) 390 if new_machine: 391 m.released_time = time.time() 392 if self.GetAvailableMachines(label): 393 break 394 else: 395 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 396 time.sleep(sleep_time) 397 self.acquire_timeout -= sleep_time 398 399 if self.acquire_timeout < 0: 400 machine_names = [] 401 for machine in machines: 402 machine_names.append(machine.name) 403 self.logger.LogFatal("Could not acquire any of the " 404 "following machines: '%s'" 405 % ", ".join(machine_names)) 406 407### for m in self._machines: 408### if (m.locked and time.time() - m.released_time < 10 and 409### m.checksum == image_checksum): 410### return None 411 for m in [machine for machine in self.GetAvailableMachines(label) 412 if not machine.locked]: 413 if image_checksum and (m.checksum == image_checksum): 414 m.locked = True 415 m.test_run = threading.current_thread() 416 return m 417 for m in [machine for machine in self.GetAvailableMachines(label) 418 if not machine.locked]: 419 if not m.checksum: 420 m.locked = True 421 m.test_run = threading.current_thread() 422 return m 423 # This logic ensures that threads waiting on a machine will get a machine 424 # with a checksum equal to their image over other threads. This saves time 425 # when crosperf initially assigns the machines to threads by minimizing 426 # the number of re-images. 427 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 428 # code and can implement minimal reimaging code more cleanly. 429 for m in [machine for machine in self.GetAvailableMachines(label) 430 if not machine.locked]: 431 if time.time() - m.released_time > 15: 432 # The release time gap is too large, so it is probably in the start 433 # stage, we need to reset the released_time. 434 m.released_time = time.time() 435 elif time.time() - m.released_time > 8: 436 m.locked = True 437 m.test_run = threading.current_thread() 438 return m 439 return None 440 441 def GetAvailableMachines(self, label=None): 442 if not label: 443 return self._machines 444 return [m for m in self._machines if m.name in label.remote] 445 446 def GetMachines(self, label=None): 447 if not label: 448 return self._all_machines 449 return [m for m in self._all_machines if m.name in label.remote] 450 451 def ReleaseMachine(self, machine): 452 with self._lock: 453 for m in self._machines: 454 if machine.name == m.name: 455 assert m.locked == True, "Tried to double-release %s" % m.name 456 m.released_time = time.time() 457 m.locked = False 458 m.status = "Available" 459 break 460 461 def Cleanup(self): 462 with self._lock: 463 # Unlock all machines (via file lock) 464 for m in self._machines: 465 res = file_lock_machine.Machine(m.name, self.locks_dir).Unlock(True) 466 467 if not res: 468 self.logger.LogError("Could not unlock machine: '%s'." 469 % m.name) 470 471 def __str__(self): 472 with self._lock: 473 l = ["MachineManager Status:"] 474 for m in self._machines: 475 l.append(str(m)) 476 return "\n".join(l) 477 478 def AsString(self): 479 with self._lock: 480 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 481 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 482 "Checksum") 483 table = [header] 484 for m in self._machines: 485 if m.test_run: 486 test_name = m.test_run.name 487 test_status = m.test_run.timeline.GetLastEvent() 488 else: 489 test_name = "" 490 test_status = "" 491 492 try: 493 machine_string = stringify_fmt % (m.name, 494 test_name, 495 m.locked, 496 test_status, 497 m.checksum) 498 except ValueError: 499 machine_string = "" 500 table.append(machine_string) 501 return "Machine Status:\n%s" % "\n".join(table) 502 503 def GetAllCPUInfo(self, labels): 504 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 505 dic = {} 506 for label in labels: 507 for machine in self._all_machines: 508 if machine.name in label.remote: 509 if machine.cpuinfo not in dic: 510 dic[machine.cpuinfo] = [label.name] 511 else: 512 dic[machine.cpuinfo].append(label.name) 513 break 514 output = "" 515 for key, v in dic.items(): 516 output += " ".join(v) 517 output += "\n-------------------\n" 518 output += key 519 output += "\n\n\n" 520 return output 521 522 523class MockCrosMachine(CrosMachine): 524 def __init__(self, name, chromeos_root, log_level): 525 self.name = name 526 self.image = None 527 self.checksum = None 528 self.locked = False 529 self.released_time = time.time() 530 self.test_run = None 531 self.chromeos_root = chromeos_root 532 self.checksum_string = re.sub("\d", "", name) 533 #In test, we assume "lumpy1", "lumpy2" are the same machine. 534 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 535 self.log_level = log_level 536 self.label = None 537 538 def IsReachable(self): 539 return True 540 541 542class MockMachineManager(MachineManager): 543 544 def __init__(self, chromeos_root, acquire_timeout, log_level, dummy_locks_dir): 545 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout, 546 log_level, 547 file_lock_machine.Machine.LOCKS_DIR) 548 549 def _TryToLockMachine(self, cros_machine): 550 self._machines.append(cros_machine) 551 cros_machine.checksum = "" 552 553 def AddMachine(self, machine_name): 554 with self._lock: 555 for m in self._all_machines: 556 assert m.name != machine_name, "Tried to double-add %s" % machine_name 557 cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level) 558 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 559 machine_name) 560 # In Original MachineManager, the test is 'if cm.machine_checksum:' - if a 561 # machine is unreachable, then its machine_checksum is None. Here we 562 # cannot do this, because machine_checksum is always faked, so we directly 563 # test cm.IsReachable, which is properly mocked. 564 if cm.IsReachable(): 565 self._all_machines.append(cm) 566 567 def AcquireMachine(self, chromeos_image, label, throw=False): 568 for machine in self._all_machines: 569 if not machine.locked: 570 machine.locked = True 571 return machine 572 return None 573 574 def ImageMachine(self, machine_name, label): 575 return 0 576 577 def ReleaseMachine(self, machine): 578 machine.locked = False 579 580 def GetMachines(self, label): 581 return self._all_machines 582 583 def GetAvailableMachines(self, label): 584 return self._all_machines 585