machine_manager.py revision 0cc4e7790afbd514675801a1ffb90517c147270f
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17from utils import command_executer 18from utils import logger 19from utils.file_utils import FileUtils 20 21from image_checksummer import ImageChecksummer 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25 26class CrosMachine(object): 27 def __init__(self, name, chromeos_root): 28 self.name = name 29 self.image = None 30 self.checksum = None 31 self.locked = False 32 self.released_time = time.time() 33 self.test_run = None 34 self.chromeos_root = chromeos_root 35 if not self.IsReachable(): 36 self.machine_checksum = None 37 return 38 self._GetMemoryInfo() 39 self._GetCPUInfo() 40 self._ComputeMachineChecksumString() 41 self._GetMachineID() 42 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 43 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 44 45 def IsReachable(self): 46 ce = command_executer.GetCommandExecuter() 47 command = "ls" 48 ret = ce.CrosRunCommand(command, 49 machine=self.name, 50 chromeos_root=self.chromeos_root) 51 if ret: 52 return False 53 return True 54 55 def _ParseMemoryInfo(self): 56 line = self.meminfo.splitlines()[0] 57 usable_kbytes = int(line.split()[1]) 58 # This code is from src/third_party/test/files/client/bin/base_utils.py 59 # usable_kbytes is system's usable DRAM in kbytes, 60 # as reported by memtotal() from device /proc/meminfo memtotal 61 # after Linux deducts 1.5% to 9.5% for system table overhead 62 # Undo the unknown actual deduction by rounding up 63 # to next small multiple of a big power-of-two 64 # eg 12GB - 5.1% gets rounded back up to 12GB 65 mindeduct = 0.005 # 0.5 percent 66 maxdeduct = 0.095 # 9.5 percent 67 # deduction range 1.5% .. 9.5% supports physical mem sizes 68 # 6GB .. 12GB in steps of .5GB 69 # 12GB .. 24GB in steps of 1 GB 70 # 24GB .. 48GB in steps of 2 GB ... 71 # Finer granularity in physical mem sizes would require 72 # tighter spread between min and max possible deductions 73 74 # increase mem size by at least min deduction, without rounding 75 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 76 # increase mem size further by 2**n rounding, by 0..roundKb or more 77 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 78 # find least binary roundup 2**n that covers worst-cast roundKb 79 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 80 # have round_kbytes <= mod2n < round_kbytes*2 81 # round min_kbytes up to next multiple of mod2n 82 phys_kbytes = min_kbytes + mod2n - 1 83 phys_kbytes -= phys_kbytes % mod2n # clear low bits 84 self.phys_kbytes = phys_kbytes 85 86 def _GetMemoryInfo(self): 87 #TODO yunlian: when the machine in rebooting, it will not return 88 #meminfo, the assert does not catch it either 89 ce = command_executer.GetCommandExecuter() 90 command = "cat /proc/meminfo" 91 ret, self.meminfo, _ = ce.CrosRunCommand( 92 command, return_output=True, 93 machine=self.name, username="root", chromeos_root=self.chromeos_root) 94 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 95 if ret == 0: 96 self._ParseMemoryInfo() 97 98 #cpuinfo format is different across architecture 99 #need to find a better way to parse it. 100 def _ParseCPUInfo(self,cpuinfo): 101 return 0 102 103 def _GetCPUInfo(self): 104 ce = command_executer.GetCommandExecuter() 105 command = "cat /proc/cpuinfo" 106 ret, self.cpuinfo, _ = ce.CrosRunCommand( 107 command, return_output=True, 108 machine=self.name, username="root", chromeos_root=self.chromeos_root) 109 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 110 if ret == 0: 111 self._ParseCPUInfo(self.cpuinfo) 112 113 def _ComputeMachineChecksumString(self): 114 self.checksum_string = "" 115 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 116 for line in self.cpuinfo.splitlines(): 117 if not any([e in line for e in exclude_lines_list]): 118 self.checksum_string += line 119 self.checksum_string += " " + str(self.phys_kbytes) 120 121 def _GetMD5Checksum(self, ss): 122 if ss: 123 return hashlib.md5(ss).hexdigest() 124 else: 125 return "" 126 127 def _GetMachineID(self): 128 ce = command_executer.GetCommandExecuter() 129 command = "dump_vpd_log --full --stdout" 130 ret, if_out, _ = ce.CrosRunCommand( 131 command, return_output=True, 132 machine=self.name, chromeos_root=self.chromeos_root) 133 b = if_out.splitlines() 134 a = [l for l in b if "Product" in l] 135 if len(a): 136 self.machine_id = a[0] 137 return 138 command = "ifconfig" 139 ret, if_out, _ = ce.CrosRunCommand( 140 command, return_output=True, 141 machine=self.name, chromeos_root=self.chromeos_root) 142 b = if_out.splitlines() 143 a = [l for l in b if "HWaddr" in l] 144 if len(a): 145 self.machine_id = "_".join(a) 146 return 147 a = [l for l in b if "ether" in l] 148 if len(a): 149 self.machine_id = "_".join(a) 150 return 151 assert 0, "Could not get machine_id from machine: %s" % self.name 152 153 def __str__(self): 154 l = [] 155 l.append(self.name) 156 l.append(str(self.image)) 157 l.append(str(self.checksum)) 158 l.append(str(self.locked)) 159 l.append(str(self.released_time)) 160 return ", ".join(l) 161 162 163class MachineManager(object): 164 def __init__(self, chromeos_root, acquire_timeout): 165 self._lock = threading.RLock() 166 self._all_machines = [] 167 self._machines = [] 168 self.image_lock = threading.Lock() 169 self.num_reimages = 0 170 self.chromeos_root = None 171 self.machine_checksum = {} 172 self.machine_checksum_string = {} 173 self.acquire_timeout = acquire_timeout 174 175 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 176 self.no_lock = False 177 else: 178 self.no_lock = True 179 self._initialized_machines = [] 180 self.chromeos_root = chromeos_root 181 182 def ImageMachine(self, machine, label): 183 if label.image_type == "local": 184 checksum = ImageChecksummer().Checksum(label) 185 elif label.image_type == "trybot": 186 checksum = machine._GetMD5Checksum(label.chromeos_image) 187 else: 188 checksum = None 189 190 if checksum and (machine.checksum == checksum): 191 return 192 chromeos_root = label.chromeos_root 193 if not chromeos_root: 194 chromeos_root = self.chromeos_root 195 image_chromeos_args = [image_chromeos.__file__, 196 "--chromeos_root=%s" % chromeos_root, 197 "--image=%s" % label.chromeos_image, 198 "--image_args=%s" % label.image_args, 199 "--remote=%s" % machine.name] 200 if label.board: 201 image_chromeos_args.append("--board=%s" % label.board) 202 203 # Currently can't image two machines at once. 204 # So have to serialized on this lock. 205 ce = command_executer.GetCommandExecuter() 206 with self.image_lock: 207 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 208 if retval: 209 cmd ="reboot && exit" 210 ce.CrosRunCommand(cmd, machine=machine.name, 211 chromeos_root=self.chromeos_root) 212 time.sleep(60) 213 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 214 if retval: 215 raise Exception("Could not image machine: '%s'." % machine.name) 216 else: 217 self.num_reimages += 1 218 machine.checksum = checksum 219 machine.image = label.chromeos_image 220 221 return retval 222 223 def ComputeCommonCheckSum(self, label): 224 for machine in self.GetMachines(label): 225 if machine.machine_checksum: 226 self.machine_checksum[label.name] = machine.machine_checksum 227 break 228 229 def ComputeCommonCheckSumString(self, label): 230 for machine in self.GetMachines(label): 231 if machine.checksum_string: 232 self.machine_checksum_string[label.name] = machine.checksum_string 233 break 234 235 def _TryToLockMachine(self, cros_machine): 236 with self._lock: 237 assert cros_machine, "Machine can't be None" 238 for m in self._machines: 239 if m.name == cros_machine.name: 240 return 241 if self.no_lock: 242 locked = True 243 else: 244 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 245 if locked: 246 self._machines.append(cros_machine) 247 ce = command_executer.GetCommandExecuter() 248 command = "cat %s" % CHECKSUM_FILE 249 ret, out, _ = ce.CrosRunCommand( 250 command, return_output=True, chromeos_root=self.chromeos_root, 251 machine=cros_machine.name) 252 if ret == 0: 253 cros_machine.checksum = out.strip() 254 else: 255 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 256 257 # This is called from single threaded mode. 258 def AddMachine(self, machine_name): 259 with self._lock: 260 for m in self._all_machines: 261 assert m.name != machine_name, "Tried to double-add %s" % machine_name 262 cm = CrosMachine(machine_name, self.chromeos_root) 263 if cm.machine_checksum: 264 self._all_machines.append(cm) 265 266 def AreAllMachineSame(self, label): 267 checksums = [m.machine_checksum for m in self.GetMachines(label)] 268 return len(set(checksums)) == 1 269 270 def RemoveMachine(self, machine_name): 271 with self._lock: 272 self._machines = [m for m in self._machines 273 if m.name != machine_name] 274 res = lock_machine.Machine(machine_name).Unlock(True) 275 if not res: 276 logger.GetLogger().LogError("Could not unlock machine: '%s'." 277 % m.name) 278 279 def AcquireMachine(self, chromeos_image, label): 280 if label.image_type == "local": 281 image_checksum = ImageChecksummer().Checksum(label) 282 elif label.image_type == "trybot": 283 image_checksum = hashlib.md5(chromeos_image).hexdigest() 284 else: 285 image_checksum = None 286 machines = self.GetMachines(label) 287 check_interval_time = 120 288 with self._lock: 289 # Lazily external lock machines 290 while self.acquire_timeout >= 0: 291 for m in machines: 292 new_machine = m not in self._all_machines 293 self._TryToLockMachine(m) 294 if new_machine: 295 m.released_time = time.time() 296 if not self.AreAllMachineSame(label): 297 logger.GetLogger().LogFatal("-- not all the machine are identical") 298 if self.GetAvailableMachines(label): 299 break 300 else: 301 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 302 time.sleep(sleep_time) 303 self.acquire_timeout -= sleep_time 304 305 if self.acquire_timeout < 0: 306 machine_names = [] 307 for machine in machines: 308 machine_names.append(machine.name) 309 logger.GetLogger().LogFatal("Could not acquire any of the " 310 "following machines: '%s'" 311 % ", ".join(machine_names)) 312 313### for m in self._machines: 314### if (m.locked and time.time() - m.released_time < 10 and 315### m.checksum == image_checksum): 316### return None 317 for m in [machine for machine in self.GetAvailableMachines(label) 318 if not machine.locked]: 319 if image_checksum and (m.checksum == image_checksum): 320 m.locked = True 321 m.test_run = threading.current_thread() 322 return m 323 for m in [machine for machine in self.GetAvailableMachines(label) 324 if not machine.locked]: 325 if not m.checksum: 326 m.locked = True 327 m.test_run = threading.current_thread() 328 return m 329 # This logic ensures that threads waiting on a machine will get a machine 330 # with a checksum equal to their image over other threads. This saves time 331 # when crosperf initially assigns the machines to threads by minimizing 332 # the number of re-images. 333 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 334 # code and can implement minimal reimaging code more cleanly. 335 for m in [machine for machine in self.GetAvailableMachines(label) 336 if not machine.locked]: 337 if time.time() - m.released_time > 20: 338 m.locked = True 339 m.test_run = threading.current_thread() 340 return m 341 return None 342 343 def GetAvailableMachines(self, label=None): 344 if not label: 345 return self._machines 346 return [m for m in self._machines if m.name in label.remote] 347 348 def GetMachines(self, label=None): 349 if not label: 350 return self._all_machines 351 return [m for m in self._all_machines if m.name in label.remote] 352 353 def ReleaseMachine(self, machine): 354 with self._lock: 355 for m in self._machines: 356 if machine.name == m.name: 357 assert m.locked == True, "Tried to double-release %s" % m.name 358 m.released_time = time.time() 359 m.locked = False 360 m.status = "Available" 361 break 362 363 def Cleanup(self): 364 with self._lock: 365 # Unlock all machines. 366 for m in self._machines: 367 if not self.no_lock: 368 res = lock_machine.Machine(m.name).Unlock(True) 369 if not res: 370 logger.GetLogger().LogError("Could not unlock machine: '%s'." 371 % m.name) 372 373 def __str__(self): 374 with self._lock: 375 l = ["MachineManager Status:"] 376 for m in self._machines: 377 l.append(str(m)) 378 return "\n".join(l) 379 380 def AsString(self): 381 with self._lock: 382 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 383 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 384 "Checksum") 385 table = [header] 386 for m in self._machines: 387 if m.test_run: 388 test_name = m.test_run.name 389 test_status = m.test_run.timeline.GetLastEvent() 390 else: 391 test_name = "" 392 test_status = "" 393 394 try: 395 machine_string = stringify_fmt % (m.name, 396 test_name, 397 m.locked, 398 test_status, 399 m.checksum) 400 except Exception: 401 machine_string = "" 402 table.append(machine_string) 403 return "Machine Status:\n%s" % "\n".join(table) 404 405 def GetAllCPUInfo(self, labels): 406 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 407 dic = {} 408 for label in labels: 409 for machine in self._all_machines: 410 if machine.name in label.remote: 411 if machine.cpuinfo not in dic: 412 dic[machine.cpuinfo] = [label.name] 413 else: 414 dic[machine.cpuinfo].append(label.name) 415 break 416 output = "" 417 for key, v in dic.items(): 418 output += " ".join(v) 419 output += "\n-------------------\n" 420 output += key 421 output += "\n\n\n" 422 return output 423 424 425class MockCrosMachine(CrosMachine): 426 def __init__(self, name, chromeos_root): 427 self.name = name 428 self.image = None 429 self.checksum = None 430 self.locked = False 431 self.released_time = time.time() 432 self.test_run = None 433 self.chromeos_root = chromeos_root 434 self.checksum_string = re.sub("\d", "", name) 435 #In test, we assume "lumpy1", "lumpy2" are the same machine. 436 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 437 438 439class MockMachineManager(MachineManager): 440 441 def __init__(self, chromeos_root, acquire_timeout): 442 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout) 443 444 def _TryToLockMachine(self, cros_machine): 445 self._machines.append(cros_machine) 446 cros_machine.checksum = "" 447 448 def AddMachine(self, machine_name): 449 with self._lock: 450 for m in self._all_machines: 451 assert m.name != machine_name, "Tried to double-add %s" % machine_name 452 cm = MockCrosMachine(machine_name, self.chromeos_root) 453 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 454 machine_name) 455 self._all_machines.append(cm) 456 457 def AcquireMachine(self, chromeos_image, label): 458 for machine in self._all_machines: 459 if not machine.locked: 460 machine.locked = True 461 return machine 462 return None 463 464 def ImageMachine(self, machine_name, label): 465 return 0 466 467 def ReleaseMachine(self, machine): 468 machine.locked = False 469 470 def GetMachines(self, label): 471 return self._all_machines 472 473 def GetAvailableMachines(self, label): 474 return self._all_machines 475