machine_manager.py revision c454cee542ca459ef9bd87c9f72e81c822caf1e5
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17from utils import command_executer 18from utils import logger 19from utils.file_utils import FileUtils 20 21from image_checksummer import ImageChecksummer 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25 26class CrosMachine(object): 27 def __init__(self, name, chromeos_root, log_level): 28 self.name = name 29 self.image = None 30 self.checksum = None 31 self.locked = False 32 self.released_time = time.time() 33 self.test_run = None 34 self.chromeos_root = chromeos_root 35 self.log_level = log_level 36 if not self.IsReachable(): 37 self.machine_checksum = None 38 return 39 self._GetMemoryInfo() 40 self._GetCPUInfo() 41 self._ComputeMachineChecksumString() 42 self._GetMachineID() 43 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 44 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 45 46 def IsReachable(self): 47 ce = command_executer.GetCommandExecuter(log_level=self.log_level) 48 command = "ls" 49 ret = ce.CrosRunCommand(command, 50 machine=self.name, 51 chromeos_root=self.chromeos_root) 52 if ret: 53 return False 54 return True 55 56 def _ParseMemoryInfo(self): 57 line = self.meminfo.splitlines()[0] 58 usable_kbytes = int(line.split()[1]) 59 # This code is from src/third_party/test/files/client/bin/base_utils.py 60 # usable_kbytes is system's usable DRAM in kbytes, 61 # as reported by memtotal() from device /proc/meminfo memtotal 62 # after Linux deducts 1.5% to 9.5% for system table overhead 63 # Undo the unknown actual deduction by rounding up 64 # to next small multiple of a big power-of-two 65 # eg 12GB - 5.1% gets rounded back up to 12GB 66 mindeduct = 0.005 # 0.5 percent 67 maxdeduct = 0.095 # 9.5 percent 68 # deduction range 1.5% .. 9.5% supports physical mem sizes 69 # 6GB .. 12GB in steps of .5GB 70 # 12GB .. 24GB in steps of 1 GB 71 # 24GB .. 48GB in steps of 2 GB ... 72 # Finer granularity in physical mem sizes would require 73 # tighter spread between min and max possible deductions 74 75 # increase mem size by at least min deduction, without rounding 76 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 77 # increase mem size further by 2**n rounding, by 0..roundKb or more 78 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 79 # find least binary roundup 2**n that covers worst-cast roundKb 80 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 81 # have round_kbytes <= mod2n < round_kbytes*2 82 # round min_kbytes up to next multiple of mod2n 83 phys_kbytes = min_kbytes + mod2n - 1 84 phys_kbytes -= phys_kbytes % mod2n # clear low bits 85 self.phys_kbytes = phys_kbytes 86 87 def _GetMemoryInfo(self): 88 #TODO yunlian: when the machine in rebooting, it will not return 89 #meminfo, the assert does not catch it either 90 ce = command_executer.GetCommandExecuter(log_level=self.log_level) 91 command = "cat /proc/meminfo" 92 ret, self.meminfo, _ = ce.CrosRunCommand( 93 command, return_output=True, 94 machine=self.name, username="root", chromeos_root=self.chromeos_root) 95 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 96 if ret == 0: 97 self._ParseMemoryInfo() 98 99 #cpuinfo format is different across architecture 100 #need to find a better way to parse it. 101 def _ParseCPUInfo(self,cpuinfo): 102 return 0 103 104 def _GetCPUInfo(self): 105 ce = command_executer.GetCommandExecuter(log_level=self.log_level) 106 command = "cat /proc/cpuinfo" 107 ret, self.cpuinfo, _ = ce.CrosRunCommand( 108 command, return_output=True, 109 machine=self.name, username="root", chromeos_root=self.chromeos_root) 110 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 111 if ret == 0: 112 self._ParseCPUInfo(self.cpuinfo) 113 114 def _ComputeMachineChecksumString(self): 115 self.checksum_string = "" 116 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 117 for line in self.cpuinfo.splitlines(): 118 if not any([e in line for e in exclude_lines_list]): 119 self.checksum_string += line 120 self.checksum_string += " " + str(self.phys_kbytes) 121 122 def _GetMD5Checksum(self, ss): 123 if ss: 124 return hashlib.md5(ss).hexdigest() 125 else: 126 return "" 127 128 def _GetMachineID(self): 129 ce = command_executer.GetCommandExecuter(log_level=self.log_level) 130 command = "dump_vpd_log --full --stdout" 131 ret, if_out, _ = ce.CrosRunCommand( 132 command, return_output=True, 133 machine=self.name, chromeos_root=self.chromeos_root) 134 b = if_out.splitlines() 135 a = [l for l in b if "Product" in l] 136 if len(a): 137 self.machine_id = a[0] 138 return 139 command = "ifconfig" 140 ret, if_out, _ = ce.CrosRunCommand( 141 command, return_output=True, 142 machine=self.name, chromeos_root=self.chromeos_root) 143 b = if_out.splitlines() 144 a = [l for l in b if "HWaddr" in l] 145 if len(a): 146 self.machine_id = "_".join(a) 147 return 148 a = [l for l in b if "ether" in l] 149 if len(a): 150 self.machine_id = "_".join(a) 151 return 152 assert 0, "Could not get machine_id from machine: %s" % self.name 153 154 def __str__(self): 155 l = [] 156 l.append(self.name) 157 l.append(str(self.image)) 158 l.append(str(self.checksum)) 159 l.append(str(self.locked)) 160 l.append(str(self.released_time)) 161 return ", ".join(l) 162 163 164class MachineManager(object): 165 def __init__(self, chromeos_root, acquire_timeout, log_level): 166 self._lock = threading.RLock() 167 self._all_machines = [] 168 self._machines = [] 169 self.image_lock = threading.Lock() 170 self.num_reimages = 0 171 self.chromeos_root = None 172 self.machine_checksum = {} 173 self.machine_checksum_string = {} 174 self.acquire_timeout = acquire_timeout 175 self.log_level = log_level 176 177 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 178 self.no_lock = False 179 else: 180 self.no_lock = True 181 self._initialized_machines = [] 182 self.chromeos_root = chromeos_root 183 184 def ImageMachine(self, machine, label): 185 if label.image_type == "local": 186 checksum = ImageChecksummer().Checksum(label, self.log_level) 187 elif label.image_type == "trybot": 188 checksum = machine._GetMD5Checksum(label.chromeos_image) 189 else: 190 checksum = None 191 192 if checksum and (machine.checksum == checksum): 193 return 194 chromeos_root = label.chromeos_root 195 if not chromeos_root: 196 chromeos_root = self.chromeos_root 197 image_chromeos_args = [image_chromeos.__file__, 198 "--chromeos_root=%s" % chromeos_root, 199 "--image=%s" % label.chromeos_image, 200 "--image_args=%s" % label.image_args, 201 "--remote=%s" % machine.name, 202 "--logging_level=%s" % self.log_level] 203 if label.board: 204 image_chromeos_args.append("--board=%s" % label.board) 205 206 # Currently can't image two machines at once. 207 # So have to serialized on this lock. 208 if self.log_level != "verbose": 209 ce = command_executer.GetCommandExecuter(log_level="average") 210 else: 211 ce = command_executer.GetCommandExecuter() 212 with self.image_lock: 213 if self.log_level != "verbose": 214 logger.GetLogger().LogOutput("Pushing image onto machine.") 215 logger.GetLogger().LogOutput("CMD : python %s " 216 % " ".join(image_chromeos_args)) 217 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 218 if retval: 219 cmd ="reboot && exit" 220 if self.log_level != "verbose": 221 logger.GetLogger().LogOutput("reboot & exit.") 222 ce.CrosRunCommand(cmd, machine=machine.name, 223 chromeos_root=self.chromeos_root) 224 time.sleep(60) 225 if self.log_level != "verbose": 226 logger.GetLogger().LogOutput("Pushing image onto machine.") 227 logger.GetLogger().LogOutput("CMD : python %s " 228 % " ".join(image_chromeos_args)) 229 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 230 if retval: 231 raise Exception("Could not image machine: '%s'." % machine.name) 232 else: 233 self.num_reimages += 1 234 machine.checksum = checksum 235 machine.image = label.chromeos_image 236 237 return retval 238 239 def ComputeCommonCheckSum(self, label): 240 for machine in self.GetMachines(label): 241 if machine.machine_checksum: 242 self.machine_checksum[label.name] = machine.machine_checksum 243 break 244 245 def ComputeCommonCheckSumString(self, label): 246 for machine in self.GetMachines(label): 247 if machine.checksum_string: 248 self.machine_checksum_string[label.name] = machine.checksum_string 249 break 250 251 def _TryToLockMachine(self, cros_machine): 252 with self._lock: 253 assert cros_machine, "Machine can't be None" 254 for m in self._machines: 255 if m.name == cros_machine.name: 256 return 257 if self.no_lock: 258 locked = True 259 else: 260 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 261 if locked: 262 self._machines.append(cros_machine) 263 ce = command_executer.GetCommandExecuter(log_level=self.log_level) 264 command = "cat %s" % CHECKSUM_FILE 265 ret, out, _ = ce.CrosRunCommand( 266 command, return_output=True, chromeos_root=self.chromeos_root, 267 machine=cros_machine.name) 268 if ret == 0: 269 cros_machine.checksum = out.strip() 270 else: 271 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 272 273 # This is called from single threaded mode. 274 def AddMachine(self, machine_name): 275 with self._lock: 276 for m in self._all_machines: 277 assert m.name != machine_name, "Tried to double-add %s" % machine_name 278 if self.log_level != "verbose": 279 logger.GetLogger().LogOutput("Setting up remote access to %s" 280 % machine_name) 281 logger.GetLogger().LogOutput("Checking machine characteristics for %s" 282 % machine_name) 283 cm = CrosMachine(machine_name, self.chromeos_root, self.log_level) 284 if cm.machine_checksum: 285 self._all_machines.append(cm) 286 287 def AreAllMachineSame(self, label): 288 checksums = [m.machine_checksum for m in self.GetMachines(label)] 289 return len(set(checksums)) == 1 290 291 def RemoveMachine(self, machine_name): 292 with self._lock: 293 self._machines = [m for m in self._machines 294 if m.name != machine_name] 295 res = lock_machine.Machine(machine_name).Unlock(True) 296 if not res: 297 logger.GetLogger().LogError("Could not unlock machine: '%s'." 298 % m.name) 299 300 def AcquireMachine(self, chromeos_image, label): 301 if label.image_type == "local": 302 image_checksum = ImageChecksummer().Checksum(label, self.log_level) 303 elif label.image_type == "trybot": 304 image_checksum = hashlib.md5(chromeos_image).hexdigest() 305 else: 306 image_checksum = None 307 machines = self.GetMachines(label) 308 check_interval_time = 120 309 with self._lock: 310 # Lazily external lock machines 311 while self.acquire_timeout >= 0: 312 for m in machines: 313 new_machine = m not in self._all_machines 314 self._TryToLockMachine(m) 315 if new_machine: 316 m.released_time = time.time() 317 if not self.AreAllMachineSame(label): 318 logger.GetLogger().LogFatal("-- not all the machine are identical") 319 if self.GetAvailableMachines(label): 320 break 321 else: 322 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 323 time.sleep(sleep_time) 324 self.acquire_timeout -= sleep_time 325 326 if self.acquire_timeout < 0: 327 machine_names = [] 328 for machine in machines: 329 machine_names.append(machine.name) 330 logger.GetLogger().LogFatal("Could not acquire any of the " 331 "following machines: '%s'" 332 % ", ".join(machine_names)) 333 334### for m in self._machines: 335### if (m.locked and time.time() - m.released_time < 10 and 336### m.checksum == image_checksum): 337### return None 338 for m in [machine for machine in self.GetAvailableMachines(label) 339 if not machine.locked]: 340 if image_checksum and (m.checksum == image_checksum): 341 m.locked = True 342 m.test_run = threading.current_thread() 343 return m 344 for m in [machine for machine in self.GetAvailableMachines(label) 345 if not machine.locked]: 346 if not m.checksum: 347 m.locked = True 348 m.test_run = threading.current_thread() 349 return m 350 # This logic ensures that threads waiting on a machine will get a machine 351 # with a checksum equal to their image over other threads. This saves time 352 # when crosperf initially assigns the machines to threads by minimizing 353 # the number of re-images. 354 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 355 # code and can implement minimal reimaging code more cleanly. 356 for m in [machine for machine in self.GetAvailableMachines(label) 357 if not machine.locked]: 358 if time.time() - m.released_time > 20: 359 m.locked = True 360 m.test_run = threading.current_thread() 361 return m 362 return None 363 364 def GetAvailableMachines(self, label=None): 365 if not label: 366 return self._machines 367 return [m for m in self._machines if m.name in label.remote] 368 369 def GetMachines(self, label=None): 370 if not label: 371 return self._all_machines 372 return [m for m in self._all_machines if m.name in label.remote] 373 374 def ReleaseMachine(self, machine): 375 with self._lock: 376 for m in self._machines: 377 if machine.name == m.name: 378 assert m.locked == True, "Tried to double-release %s" % m.name 379 m.released_time = time.time() 380 m.locked = False 381 m.status = "Available" 382 break 383 384 def Cleanup(self): 385 with self._lock: 386 # Unlock all machines. 387 for m in self._machines: 388 if not self.no_lock: 389 res = lock_machine.Machine(m.name).Unlock(True) 390 if not res: 391 logger.GetLogger().LogError("Could not unlock machine: '%s'." 392 % m.name) 393 394 def __str__(self): 395 with self._lock: 396 l = ["MachineManager Status:"] 397 for m in self._machines: 398 l.append(str(m)) 399 return "\n".join(l) 400 401 def AsString(self): 402 with self._lock: 403 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 404 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 405 "Checksum") 406 table = [header] 407 for m in self._machines: 408 if m.test_run: 409 test_name = m.test_run.name 410 test_status = m.test_run.timeline.GetLastEvent() 411 else: 412 test_name = "" 413 test_status = "" 414 415 try: 416 machine_string = stringify_fmt % (m.name, 417 test_name, 418 m.locked, 419 test_status, 420 m.checksum) 421 except Exception: 422 machine_string = "" 423 table.append(machine_string) 424 return "Machine Status:\n%s" % "\n".join(table) 425 426 def GetAllCPUInfo(self, labels): 427 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 428 dic = {} 429 for label in labels: 430 for machine in self._all_machines: 431 if machine.name in label.remote: 432 if machine.cpuinfo not in dic: 433 dic[machine.cpuinfo] = [label.name] 434 else: 435 dic[machine.cpuinfo].append(label.name) 436 break 437 output = "" 438 for key, v in dic.items(): 439 output += " ".join(v) 440 output += "\n-------------------\n" 441 output += key 442 output += "\n\n\n" 443 return output 444 445 446class MockCrosMachine(CrosMachine): 447 def __init__(self, name, chromeos_root, log_level): 448 self.name = name 449 self.image = None 450 self.checksum = None 451 self.locked = False 452 self.released_time = time.time() 453 self.test_run = None 454 self.chromeos_root = chromeos_root 455 self.checksum_string = re.sub("\d", "", name) 456 #In test, we assume "lumpy1", "lumpy2" are the same machine. 457 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 458 self.log_level = log_level 459 460 def IsReachable(self): 461 return True 462 463 464class MockMachineManager(MachineManager): 465 466 def __init__(self, chromeos_root, acquire_timeout, log_level): 467 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout, 468 log_level) 469 470 def _TryToLockMachine(self, cros_machine): 471 self._machines.append(cros_machine) 472 cros_machine.checksum = "" 473 474 def AddMachine(self, machine_name): 475 with self._lock: 476 for m in self._all_machines: 477 assert m.name != machine_name, "Tried to double-add %s" % machine_name 478 cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level) 479 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 480 machine_name) 481 self._all_machines.append(cm) 482 483 def AcquireMachine(self, chromeos_image, label): 484 for machine in self._all_machines: 485 if not machine.locked: 486 machine.locked = True 487 return machine 488 return None 489 490 def ImageMachine(self, machine_name, label): 491 return 0 492 493 def ReleaseMachine(self, machine): 494 machine.locked = False 495 496 def GetMachines(self, label): 497 return self._all_machines 498 499 def GetAvailableMachines(self, label): 500 return self._all_machines 501