machine_manager.py revision 5c09fc2966ac49263ce7154c2905f2a86aeda297
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17 18from utils import command_executer 19from utils import logger 20from utils import misc 21from utils.file_utils import FileUtils 22 23from image_checksummer import ImageChecksummer 24 25CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 26 27class NonMatchingMachines(Exception): 28 pass 29 30class MissingLocksDirectory(Exception): 31 """Raised when cannot find/access the machine locks directory.""" 32 33class CrosMachine(object): 34 def __init__(self, name, chromeos_root, log_level, cmd_exec=None): 35 self.name = name 36 self.image = None 37 self.checksum = None 38 self.locked = False 39 self.released_time = time.time() 40 self.test_run = None 41 self.chromeos_root = chromeos_root 42 self.log_level = log_level 43 self.ce = cmd_exec or command_executer.GetCommandExecuter( 44 log_level=self.log_level) 45 self.SetUpChecksumInfo() 46 47 def SetUpChecksumInfo(self): 48 if not self.IsReachable(): 49 self.machine_checksum = None 50 return 51 self._GetMemoryInfo() 52 self._GetCPUInfo() 53 self._ComputeMachineChecksumString() 54 self._GetMachineID() 55 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 56 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 57 58 def IsReachable(self): 59 command = "ls" 60 ret = self.ce.CrosRunCommand(command, 61 machine=self.name, 62 chromeos_root=self.chromeos_root) 63 if ret: 64 return False 65 return True 66 67 def _ParseMemoryInfo(self): 68 line = self.meminfo.splitlines()[0] 69 usable_kbytes = int(line.split()[1]) 70 # This code is from src/third_party/test/files/client/bin/base_utils.py 71 # usable_kbytes is system's usable DRAM in kbytes, 72 # as reported by memtotal() from device /proc/meminfo memtotal 73 # after Linux deducts 1.5% to 9.5% for system table overhead 74 # Undo the unknown actual deduction by rounding up 75 # to next small multiple of a big power-of-two 76 # eg 12GB - 5.1% gets rounded back up to 12GB 77 mindeduct = 0.005 # 0.5 percent 78 maxdeduct = 0.095 # 9.5 percent 79 # deduction range 1.5% .. 9.5% supports physical mem sizes 80 # 6GB .. 12GB in steps of .5GB 81 # 12GB .. 24GB in steps of 1 GB 82 # 24GB .. 48GB in steps of 2 GB ... 83 # Finer granularity in physical mem sizes would require 84 # tighter spread between min and max possible deductions 85 86 # increase mem size by at least min deduction, without rounding 87 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 88 # increase mem size further by 2**n rounding, by 0..roundKb or more 89 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 90 # find least binary roundup 2**n that covers worst-cast roundKb 91 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 92 # have round_kbytes <= mod2n < round_kbytes*2 93 # round min_kbytes up to next multiple of mod2n 94 phys_kbytes = min_kbytes + mod2n - 1 95 phys_kbytes -= phys_kbytes % mod2n # clear low bits 96 self.phys_kbytes = phys_kbytes 97 98 def _GetMemoryInfo(self): 99 #TODO yunlian: when the machine in rebooting, it will not return 100 #meminfo, the assert does not catch it either 101 command = "cat /proc/meminfo" 102 ret, self.meminfo, _ = self.ce.CrosRunCommand(command, return_output=True, 103 machine=self.name, 104 username="root", 105 chromeos_root=self.chromeos_root) 106 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 107 if ret == 0: 108 self._ParseMemoryInfo() 109 110 #cpuinfo format is different across architecture 111 #need to find a better way to parse it. 112 def _ParseCPUInfo(self, cpuinfo): 113 return 0 114 115 def _GetCPUInfo(self): 116 command = "cat /proc/cpuinfo" 117 ret, self.cpuinfo, _ = self.ce.CrosRunCommand(command, return_output=True, 118 machine=self.name, 119 username="root", 120 chromeos_root=self.chromeos_root) 121 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 122 if ret == 0: 123 self._ParseCPUInfo(self.cpuinfo) 124 125 def _ComputeMachineChecksumString(self): 126 self.checksum_string = "" 127 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 128 for line in self.cpuinfo.splitlines(): 129 if not any([e in line for e in exclude_lines_list]): 130 self.checksum_string += line 131 self.checksum_string += " " + str(self.phys_kbytes) 132 133 def _GetMD5Checksum(self, ss): 134 if ss: 135 return hashlib.md5(ss).hexdigest() 136 else: 137 return "" 138 139 def _GetMachineID(self): 140 command = "dump_vpd_log --full --stdout" 141 ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True, 142 machine=self.name, 143 chromeos_root=self.chromeos_root) 144 b = if_out.splitlines() 145 a = [l for l in b if "Product" in l] 146 if len(a): 147 self.machine_id = a[0] 148 return 149 command = "ifconfig" 150 ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True, 151 machine=self.name, 152 chromeos_root=self.chromeos_root) 153 b = if_out.splitlines() 154 a = [l for l in b if "HWaddr" in l] 155 if len(a): 156 self.machine_id = "_".join(a) 157 return 158 a = [l for l in b if "ether" in l] 159 if len(a): 160 self.machine_id = "_".join(a) 161 return 162 assert 0, "Could not get machine_id from machine: %s" % self.name 163 164 def __str__(self): 165 l = [] 166 l.append(self.name) 167 l.append(str(self.image)) 168 l.append(str(self.checksum)) 169 l.append(str(self.locked)) 170 l.append(str(self.released_time)) 171 return ", ".join(l) 172 173 174class MachineManager(object): 175 def __init__(self, chromeos_root, acquire_timeout, log_level, locks_dir, 176 cmd_exec=None, lgr=None): 177 self._lock = threading.RLock() 178 self._all_machines = [] 179 self._machines = [] 180 self.image_lock = threading.Lock() 181 self.num_reimages = 0 182 self.chromeos_root = None 183 self.machine_checksum = {} 184 self.machine_checksum_string = {} 185 self.acquire_timeout = acquire_timeout 186 self.log_level = log_level 187 self.locks_dir = locks_dir 188 self.ce = cmd_exec or command_executer.GetCommandExecuter( 189 log_level=self.log_level) 190 self.logger = lgr or logger.GetLogger() 191 192 if self.locks_dir != lock_machine.Machine.LOCKS_DIR: 193 msg = ("WARNING: If you use your own locks directory, there is no" 194 " guarantee that someone else might not hold a lock on the same" 195 " machine in a different locks directory.") 196 self.logger.LogOutput(msg) 197 198 if not os.path.isdir(self.locks_dir): 199 raise MissingLocksDirectory ("Cannot access locks directory: %s" 200 % self.locks_dir) 201 self._initialized_machines = [] 202 self.chromeos_root = chromeos_root 203 204 def ImageMachine(self, machine, label): 205 if label.image_type == "local": 206 checksum = ImageChecksummer().Checksum(label, self.log_level) 207 elif label.image_type == "trybot": 208 checksum = machine._GetMD5Checksum(label.chromeos_image) 209 else: 210 checksum = None 211 212 if checksum and (machine.checksum == checksum): 213 return 214 chromeos_root = label.chromeos_root 215 if not chromeos_root: 216 chromeos_root = self.chromeos_root 217 image_chromeos_args = [image_chromeos.__file__, 218 "--chromeos_root=%s" % chromeos_root, 219 "--image=%s" % label.chromeos_image, 220 "--image_args=%s" % label.image_args, 221 "--remote=%s" % machine.name, 222 "--logging_level=%s" % self.log_level] 223 if label.board: 224 image_chromeos_args.append("--board=%s" % label.board) 225 226 # Currently can't image two machines at once. 227 # So have to serialized on this lock. 228 save_ce_log_level = self.ce.log_level 229 if self.log_level != "verbose": 230 self.ce.log_level = "average" 231 232 with self.image_lock: 233 if self.log_level != "verbose": 234 self.logger.LogOutput("Pushing image onto machine.") 235 self.logger.LogOutput("CMD : python %s " 236 % " ".join(image_chromeos_args)) 237 retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 238 if retval: 239 cmd = "reboot && exit" 240 if self.log_level != "verbose": 241 self.logger.LogOutput("reboot & exit.") 242 self.ce.CrosRunCommand(cmd, machine=machine.name, 243 chromeos_root=self.chromeos_root) 244 time.sleep(60) 245 if self.log_level != "verbose": 246 self.logger.LogOutput("Pushing image onto machine.") 247 self.logger.LogOutput("CMD : python %s " 248 % " ".join(image_chromeos_args)) 249 retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 250 if retval: 251 raise Exception("Could not image machine: '%s'." % machine.name) 252 else: 253 self.num_reimages += 1 254 machine.checksum = checksum 255 machine.image = label.chromeos_image 256 257 self.ce.log_level = save_ce_log_level 258 return retval 259 260 def ComputeCommonCheckSum(self, label): 261 for machine in self.GetMachines(label): 262 if machine.machine_checksum: 263 self.machine_checksum[label.name] = machine.machine_checksum 264 break 265 266 def ComputeCommonCheckSumString(self, label): 267 for machine in self.GetMachines(label): 268 if machine.checksum_string: 269 self.machine_checksum_string[label.name] = machine.checksum_string 270 break 271 272 def _TryToLockMachine(self, cros_machine): 273 with self._lock: 274 assert cros_machine, "Machine can't be None" 275 for m in self._machines: 276 if m.name == cros_machine.name: 277 return 278 locked = lock_machine.Machine(cros_machine.name, 279 self.locks_dir).Lock(True, 280 sys.argv[0]) 281 if locked: 282 self._machines.append(cros_machine) 283 command = "cat %s" % CHECKSUM_FILE 284 ret, out, _ = self.ce.CrosRunCommand( 285 command, return_output=True, chromeos_root=self.chromeos_root, 286 machine=cros_machine.name) 287 if ret == 0: 288 cros_machine.checksum = out.strip() 289 else: 290 self.logger.LogOutput("Couldn't lock: %s" % cros_machine.name) 291 292 # This is called from single threaded mode. 293 def AddMachine(self, machine_name): 294 with self._lock: 295 for m in self._all_machines: 296 assert m.name != machine_name, "Tried to double-add %s" % machine_name 297 if self.log_level != "verbose": 298 self.logger.LogOutput("Setting up remote access to %s" 299 % machine_name) 300 self.logger.LogOutput("Checking machine characteristics for %s" 301 % machine_name) 302 cm = CrosMachine(machine_name, self.chromeos_root, self.log_level) 303 if cm.machine_checksum: 304 self._all_machines.append(cm) 305 306 307 def AreAllMachineSame(self, label): 308 checksums = [m.machine_checksum for m in self.GetMachines(label)] 309 return len(set(checksums)) == 1 310 311 312 def RemoveMachine(self, machine_name): 313 with self._lock: 314 self._machines = [m for m in self._machines 315 if m.name != machine_name] 316 res = lock_machine.Machine(machine_name, 317 self.locks_dir).Unlock(True) 318 319 if not res: 320 self.logger.LogError("Could not unlock machine: '%s'." 321 % m.name) 322 323 def ForceSameImageToAllMachines(self, label): 324 machines = self.GetMachines(label) 325 for m in machines: 326 self.ImageMachine(m, label) 327 m.SetUpChecksumInfo() 328 329 def AcquireMachine(self, chromeos_image, label, throw=False): 330 if label.image_type == "local": 331 image_checksum = ImageChecksummer().Checksum(label, self.log_level) 332 elif label.image_type == "trybot": 333 image_checksum = hashlib.md5(chromeos_image).hexdigest() 334 else: 335 image_checksum = None 336 machines = self.GetMachines(label) 337 check_interval_time = 120 338 with self._lock: 339 # Lazily external lock machines 340 while self.acquire_timeout >= 0: 341 for m in machines: 342 new_machine = m not in self._all_machines 343 self._TryToLockMachine(m) 344 if new_machine: 345 m.released_time = time.time() 346 if not self.AreAllMachineSame(label): 347 if not throw: 348 # Log fatal message, which calls sys.exit. Default behavior. 349 self.logger.LogFatal("-- not all the machines are identical") 350 else: 351 # Raise an exception, which can be caught and handled by calling 352 # function. 353 raise NonMatchingMachines("Not all the machines are identical") 354 if self.GetAvailableMachines(label): 355 break 356 else: 357 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 358 time.sleep(sleep_time) 359 self.acquire_timeout -= sleep_time 360 361 if self.acquire_timeout < 0: 362 machine_names = [] 363 for machine in machines: 364 machine_names.append(machine.name) 365 self.logger.LogFatal("Could not acquire any of the " 366 "following machines: '%s'" 367 % ", ".join(machine_names)) 368 369### for m in self._machines: 370### if (m.locked and time.time() - m.released_time < 10 and 371### m.checksum == image_checksum): 372### return None 373 for m in [machine for machine in self.GetAvailableMachines(label) 374 if not machine.locked]: 375 if image_checksum and (m.checksum == image_checksum): 376 m.locked = True 377 m.test_run = threading.current_thread() 378 return m 379 for m in [machine for machine in self.GetAvailableMachines(label) 380 if not machine.locked]: 381 if not m.checksum: 382 m.locked = True 383 m.test_run = threading.current_thread() 384 return m 385 # This logic ensures that threads waiting on a machine will get a machine 386 # with a checksum equal to their image over other threads. This saves time 387 # when crosperf initially assigns the machines to threads by minimizing 388 # the number of re-images. 389 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 390 # code and can implement minimal reimaging code more cleanly. 391 for m in [machine for machine in self.GetAvailableMachines(label) 392 if not machine.locked]: 393 if time.time() - m.released_time > 15: 394 # The release time gap is too large, so it is probably in the start 395 # stage, we need to reset the released_time. 396 m.released_time = time.time() 397 elif time.time() - m.released_time > 8: 398 m.locked = True 399 m.test_run = threading.current_thread() 400 return m 401 return None 402 403 def GetAvailableMachines(self, label=None): 404 if not label: 405 return self._machines 406 return [m for m in self._machines if m.name in label.remote] 407 408 def GetMachines(self, label=None): 409 if not label: 410 return self._all_machines 411 return [m for m in self._all_machines if m.name in label.remote] 412 413 def ReleaseMachine(self, machine): 414 with self._lock: 415 for m in self._machines: 416 if machine.name == m.name: 417 assert m.locked == True, "Tried to double-release %s" % m.name 418 m.released_time = time.time() 419 m.locked = False 420 m.status = "Available" 421 break 422 423 def Cleanup(self): 424 with self._lock: 425 # Unlock all machines. 426 for m in self._machines: 427 res = lock_machine.Machine(m.name, self.locks_dir).Unlock(True) 428 429 if not res: 430 self.logger.LogError("Could not unlock machine: '%s'." 431 % m.name) 432 433 def __str__(self): 434 with self._lock: 435 l = ["MachineManager Status:"] 436 for m in self._machines: 437 l.append(str(m)) 438 return "\n".join(l) 439 440 def AsString(self): 441 with self._lock: 442 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 443 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 444 "Checksum") 445 table = [header] 446 for m in self._machines: 447 if m.test_run: 448 test_name = m.test_run.name 449 test_status = m.test_run.timeline.GetLastEvent() 450 else: 451 test_name = "" 452 test_status = "" 453 454 try: 455 machine_string = stringify_fmt % (m.name, 456 test_name, 457 m.locked, 458 test_status, 459 m.checksum) 460 except Exception: 461 machine_string = "" 462 table.append(machine_string) 463 return "Machine Status:\n%s" % "\n".join(table) 464 465 def GetAllCPUInfo(self, labels): 466 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 467 dic = {} 468 for label in labels: 469 for machine in self._all_machines: 470 if machine.name in label.remote: 471 if machine.cpuinfo not in dic: 472 dic[machine.cpuinfo] = [label.name] 473 else: 474 dic[machine.cpuinfo].append(label.name) 475 break 476 output = "" 477 for key, v in dic.items(): 478 output += " ".join(v) 479 output += "\n-------------------\n" 480 output += key 481 output += "\n\n\n" 482 return output 483 484 485class MockCrosMachine(CrosMachine): 486 def __init__(self, name, chromeos_root, log_level): 487 self.name = name 488 self.image = None 489 self.checksum = None 490 self.locked = False 491 self.released_time = time.time() 492 self.test_run = None 493 self.chromeos_root = chromeos_root 494 self.checksum_string = re.sub("\d", "", name) 495 #In test, we assume "lumpy1", "lumpy2" are the same machine. 496 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 497 self.log_level = log_level 498 499 def IsReachable(self): 500 return True 501 502 503class MockMachineManager(MachineManager): 504 505 def __init__(self, chromeos_root, acquire_timeout, log_level): 506 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout, 507 log_level) 508 509 def _TryToLockMachine(self, cros_machine): 510 self._machines.append(cros_machine) 511 cros_machine.checksum = "" 512 513 def AddMachine(self, machine_name): 514 with self._lock: 515 for m in self._all_machines: 516 assert m.name != machine_name, "Tried to double-add %s" % machine_name 517 cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level) 518 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 519 machine_name) 520 self._all_machines.append(cm) 521 522 def AcquireMachine(self, chromeos_image, label, throw=False): 523 for machine in self._all_machines: 524 if not machine.locked: 525 machine.locked = True 526 return machine 527 return None 528 529 def ImageMachine(self, machine_name, label): 530 return 0 531 532 def ReleaseMachine(self, machine): 533 machine.locked = False 534 535 def GetMachines(self, label): 536 return self._all_machines 537 538 def GetAvailableMachines(self, label): 539 return self._all_machines 540