machine_manager.py revision 870c1847e55058db7e7cd0631f8b3c02e10dfd40
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17from utils import command_executer 18from utils import logger 19from utils.file_utils import FileUtils 20 21from image_checksummer import ImageChecksummer 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25 26class CrosMachine(object): 27 def __init__(self, name, chromeos_root): 28 self.name = name 29 self.image = None 30 self.checksum = None 31 self.locked = False 32 self.released_time = time.time() 33 self.test_run = None 34 self.chromeos_root = chromeos_root 35 if not self.IsReachable(): 36 self.machine_checksum = None 37 return 38 self._GetMemoryInfo() 39 self._GetCPUInfo() 40 self._ComputeMachineChecksumString() 41 self._GetMachineID() 42 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 43 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 44 45 def IsReachable(self): 46 ce = command_executer.GetCommandExecuter() 47 command = "ls" 48 ret = ce.CrosRunCommand(command, 49 machine=self.name, 50 chromeos_root=self.chromeos_root) 51 if ret: 52 return False 53 return True 54 55 def _ParseMemoryInfo(self): 56 line = self.meminfo.splitlines()[0] 57 usable_kbytes = int(line.split()[1]) 58 # This code is from src/third_party/test/files/client/bin/base_utils.py 59 # usable_kbytes is system's usable DRAM in kbytes, 60 # as reported by memtotal() from device /proc/meminfo memtotal 61 # after Linux deducts 1.5% to 9.5% for system table overhead 62 # Undo the unknown actual deduction by rounding up 63 # to next small multiple of a big power-of-two 64 # eg 12GB - 5.1% gets rounded back up to 12GB 65 mindeduct = 0.005 # 0.5 percent 66 maxdeduct = 0.095 # 9.5 percent 67 # deduction range 1.5% .. 9.5% supports physical mem sizes 68 # 6GB .. 12GB in steps of .5GB 69 # 12GB .. 24GB in steps of 1 GB 70 # 24GB .. 48GB in steps of 2 GB ... 71 # Finer granularity in physical mem sizes would require 72 # tighter spread between min and max possible deductions 73 74 # increase mem size by at least min deduction, without rounding 75 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 76 # increase mem size further by 2**n rounding, by 0..roundKb or more 77 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 78 # find least binary roundup 2**n that covers worst-cast roundKb 79 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 80 # have round_kbytes <= mod2n < round_kbytes*2 81 # round min_kbytes up to next multiple of mod2n 82 phys_kbytes = min_kbytes + mod2n - 1 83 phys_kbytes -= phys_kbytes % mod2n # clear low bits 84 self.phys_kbytes = phys_kbytes 85 86 def _GetMemoryInfo(self): 87 #TODO yunlian: when the machine in rebooting, it will not return 88 #meminfo, the assert does not catch it either 89 ce = command_executer.GetCommandExecuter() 90 command = "cat /proc/meminfo" 91 ret, self.meminfo, _ = ce.CrosRunCommand( 92 command, return_output=True, 93 machine=self.name, username="root", chromeos_root=self.chromeos_root) 94 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 95 if ret == 0: 96 self._ParseMemoryInfo() 97 98 #cpuinfo format is different across architecture 99 #need to find a better way to parse it. 100 def _ParseCPUInfo(self,cpuinfo): 101 return 0 102 103 def _GetCPUInfo(self): 104 ce = command_executer.GetCommandExecuter() 105 command = "cat /proc/cpuinfo" 106 ret, self.cpuinfo, _ = ce.CrosRunCommand( 107 command, return_output=True, 108 machine=self.name, username="root", chromeos_root=self.chromeos_root) 109 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 110 if ret == 0: 111 self._ParseCPUInfo(self.cpuinfo) 112 113 def _ComputeMachineChecksumString(self): 114 self.checksum_string = "" 115 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 116 for line in self.cpuinfo.splitlines(): 117 if not any([e in line for e in exclude_lines_list]): 118 self.checksum_string += line 119 self.checksum_string += " " + str(self.phys_kbytes) 120 121 def _GetMD5Checksum(self, ss): 122 if ss: 123 return hashlib.md5(ss).hexdigest() 124 else: 125 return "" 126 127 def _GetMachineID(self): 128 ce = command_executer.GetCommandExecuter() 129 command = "dump_vpd_log --full --stdout" 130 ret, if_out, _ = ce.CrosRunCommand( 131 command, return_output=True, 132 machine=self.name, chromeos_root=self.chromeos_root) 133 b = if_out.splitlines() 134 a = [l for l in b if "Product" in l] 135 if len(a): 136 self.machine_id = a[0] 137 return 138 command = "ifconfig" 139 ret, if_out, _ = ce.CrosRunCommand( 140 command, return_output=True, 141 machine=self.name, chromeos_root=self.chromeos_root) 142 b = if_out.splitlines() 143 a = [l for l in b if "HWaddr" in l] 144 if len(a): 145 self.machine_id = "_".join(a) 146 return 147 a = [l for l in b if "ether" in l] 148 if len(a): 149 self.machine_id = "_".join(a) 150 return 151 assert 0, "Could not get machine_id from machine: %s" % self.name 152 153 def __str__(self): 154 l = [] 155 l.append(self.name) 156 l.append(str(self.image)) 157 l.append(str(self.checksum)) 158 l.append(str(self.locked)) 159 l.append(str(self.released_time)) 160 return ", ".join(l) 161 162 163class MachineManager(object): 164 def __init__(self, chromeos_root, acquire_timeout): 165 self._lock = threading.RLock() 166 self._all_machines = [] 167 self._machines = [] 168 self.image_lock = threading.Lock() 169 self.num_reimages = 0 170 self.chromeos_root = None 171 self.machine_checksum = {} 172 self.machine_checksum_string = {} 173 self.acquire_timeout = acquire_timeout 174 175 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 176 self.no_lock = False 177 else: 178 self.no_lock = True 179 self._initialized_machines = [] 180 self.chromeos_root = chromeos_root 181 182 def ImageMachine(self, machine, label): 183 checksum = ImageChecksummer().Checksum(label) 184 if machine.checksum == checksum: 185 return 186 chromeos_root = label.chromeos_root 187 if not chromeos_root: 188 chromeos_root = self.chromeos_root 189 image_chromeos_args = [image_chromeos.__file__, 190 "--chromeos_root=%s" % chromeos_root, 191 "--image=%s" % label.chromeos_image, 192 "--image_args=%s" % label.image_args, 193 "--remote=%s" % machine.name] 194 if label.board: 195 image_chromeos_args.append("--board=%s" % label.board) 196 197 # Currently can't image two machines at once. 198 # So have to serialized on this lock. 199 ce = command_executer.GetCommandExecuter() 200 with self.image_lock: 201 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 202 if retval: 203 cmd ="reboot && exit" 204 ce.CrosRunCommand(cmd, machine=machine.name, 205 chromeos_root=self.chromeos_root) 206 time.sleep(60) 207 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 208 if retval: 209 raise Exception("Could not image machine: '%s'." % machine.name) 210 else: 211 self.num_reimages += 1 212 machine.checksum = checksum 213 machine.image = label.chromeos_image 214 215 return retval 216 217 def ComputeCommonCheckSum(self, label): 218 for machine in self.GetMachines(label): 219 if machine.machine_checksum: 220 self.machine_checksum[label.name] = machine.machine_checksum 221 break 222 223 def ComputeCommonCheckSumString(self, label): 224 for machine in self.GetMachines(label): 225 if machine.checksum_string: 226 self.machine_checksum_string[label.name] = machine.checksum_string 227 break 228 229 def _TryToLockMachine(self, cros_machine): 230 with self._lock: 231 assert cros_machine, "Machine can't be None" 232 for m in self._machines: 233 if m.name == cros_machine.name: 234 return 235 if self.no_lock: 236 locked = True 237 else: 238 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 239 if locked: 240 self._machines.append(cros_machine) 241 ce = command_executer.GetCommandExecuter() 242 command = "cat %s" % CHECKSUM_FILE 243 ret, out, _ = ce.CrosRunCommand( 244 command, return_output=True, chromeos_root=self.chromeos_root, 245 machine=cros_machine.name) 246 if ret == 0: 247 cros_machine.checksum = out.strip() 248 else: 249 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 250 251 # This is called from single threaded mode. 252 def AddMachine(self, machine_name): 253 with self._lock: 254 for m in self._all_machines: 255 assert m.name != machine_name, "Tried to double-add %s" % machine_name 256 cm = CrosMachine(machine_name, self.chromeos_root) 257 if cm.machine_checksum: 258 self._all_machines.append(cm) 259 260 def AreAllMachineSame(self, label): 261 checksums = [m.machine_checksum for m in self.GetMachines(label)] 262 return len(set(checksums)) == 1 263 264 def RemoveMachine(self, machine_name): 265 with self._lock: 266 self._machines = [m for m in self._machines 267 if m.name != machine_name] 268 res = lock_machine.Machine(machine_name).Unlock(True) 269 if not res: 270 logger.GetLogger().LogError("Could not unlock machine: '%s'." 271 % m.name) 272 273 def AcquireMachine(self, chromeos_image, label): 274 image_checksum = ImageChecksummer().Checksum(label) 275 machines = self.GetMachines(label) 276 check_interval_time = 120 277 with self._lock: 278 # Lazily external lock machines 279 while self.acquire_timeout >= 0: 280 for m in machines: 281 new_machine = m not in self._all_machines 282 self._TryToLockMachine(m) 283 if new_machine: 284 m.released_time = time.time() 285 if not self.AreAllMachineSame(label): 286 logger.GetLogger().LogFatal("-- not all the machine are identical") 287 if self.GetAvailableMachines(label): 288 break 289 else: 290 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 291 time.sleep(sleep_time) 292 self.acquire_timeout -= sleep_time 293 294 if self.acquire_timeout < 0: 295 machine_names = [] 296 for machine in machines: 297 machine_names.append(machine.name) 298 logger.GetLogger().LogFatal("Could not acquire any of the " 299 "following machines: '%s'" 300 % ", ".join(machine_names)) 301 302### for m in self._machines: 303### if (m.locked and time.time() - m.released_time < 10 and 304### m.checksum == image_checksum): 305### return None 306 for m in [machine for machine in self.GetAvailableMachines(label) 307 if not machine.locked]: 308 if m.checksum == image_checksum: 309 m.locked = True 310 m.test_run = threading.current_thread() 311 return m 312 for m in [machine for machine in self.GetAvailableMachines(label) 313 if not machine.locked]: 314 if not m.checksum: 315 m.locked = True 316 m.test_run = threading.current_thread() 317 return m 318 # This logic ensures that threads waiting on a machine will get a machine 319 # with a checksum equal to their image over other threads. This saves time 320 # when crosperf initially assigns the machines to threads by minimizing 321 # the number of re-images. 322 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 323 # code and can implement minimal reimaging code more cleanly. 324 for m in [machine for machine in self.GetAvailableMachines(label) 325 if not machine.locked]: 326 if time.time() - m.released_time > 20: 327 m.locked = True 328 m.test_run = threading.current_thread() 329 return m 330 return None 331 332 def GetAvailableMachines(self, label=None): 333 if not label: 334 return self._machines 335 return [m for m in self._machines if m.name in label.remote] 336 337 def GetMachines(self, label=None): 338 if not label: 339 return self._all_machines 340 return [m for m in self._all_machines if m.name in label.remote] 341 342 def ReleaseMachine(self, machine): 343 with self._lock: 344 for m in self._machines: 345 if machine.name == m.name: 346 assert m.locked == True, "Tried to double-release %s" % m.name 347 m.released_time = time.time() 348 m.locked = False 349 m.status = "Available" 350 break 351 352 def Cleanup(self): 353 with self._lock: 354 # Unlock all machines. 355 for m in self._machines: 356 if not self.no_lock: 357 res = lock_machine.Machine(m.name).Unlock(True) 358 if not res: 359 logger.GetLogger().LogError("Could not unlock machine: '%s'." 360 % m.name) 361 362 def __str__(self): 363 with self._lock: 364 l = ["MachineManager Status:"] 365 for m in self._machines: 366 l.append(str(m)) 367 return "\n".join(l) 368 369 def AsString(self): 370 with self._lock: 371 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 372 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 373 "Checksum") 374 table = [header] 375 for m in self._machines: 376 if m.test_run: 377 test_name = m.test_run.name 378 test_status = m.test_run.timeline.GetLastEvent() 379 else: 380 test_name = "" 381 test_status = "" 382 383 try: 384 machine_string = stringify_fmt % (m.name, 385 test_name, 386 m.locked, 387 test_status, 388 m.checksum) 389 except Exception: 390 machine_string = "" 391 table.append(machine_string) 392 return "Machine Status:\n%s" % "\n".join(table) 393 394 def GetAllCPUInfo(self, labels): 395 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 396 dic = {} 397 for label in labels: 398 for machine in self._all_machines: 399 if machine.name in label.remote: 400 if machine.cpuinfo not in dic: 401 dic[machine.cpuinfo] = [label.name] 402 else: 403 dic[machine.cpuinfo].append(label.name) 404 break 405 output = "" 406 for key, v in dic.items(): 407 output += " ".join(v) 408 output += "\n-------------------\n" 409 output += key 410 output += "\n\n\n" 411 return output 412 413 414class MockCrosMachine(CrosMachine): 415 def __init__(self, name, chromeos_root): 416 self.name = name 417 self.image = None 418 self.checksum = None 419 self.locked = False 420 self.released_time = time.time() 421 self.test_run = None 422 self.chromeos_root = chromeos_root 423 self.checksum_string = re.sub("\d", "", name) 424 #In test, we assume "lumpy1", "lumpy2" are the same machine. 425 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 426 427 428class MockMachineManager(MachineManager): 429 430 def __init__(self, chromeos_root, acquire_timeout): 431 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout) 432 433 def _TryToLockMachine(self, cros_machine): 434 self._machines.append(cros_machine) 435 cros_machine.checksum = "" 436 437 def AddMachine(self, machine_name): 438 with self._lock: 439 for m in self._all_machines: 440 assert m.name != machine_name, "Tried to double-add %s" % machine_name 441 cm = MockCrosMachine(machine_name, self.chromeos_root) 442 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 443 machine_name) 444 self._all_machines.append(cm) 445 446 def AcquireMachine(self, chromeos_image, label): 447 for machine in self._all_machines: 448 if not machine.locked: 449 machine.locked = True 450 return machine 451 return None 452 453 def ImageMachine(self, machine_name, label): 454 return 0 455 456 def ReleaseMachine(self, machine): 457 machine.locked = False 458 459 def GetMachines(self, label): 460 return self._all_machines 461 462 def GetAvailableMachines(self, label): 463 return self._all_machines 464