machine_manager.py revision 4467f004e7f0854963bec90daff1879fbd9d2fec
1#!/usr/bin/python 2# 3# Copyright 2012 Google Inc. All Rights Reserved. 4 5import hashlib 6import image_chromeos 7import lock_machine 8import math 9import os.path 10import re 11import sys 12import threading 13import time 14 15from utils import command_executer 16from utils import logger 17from utils.file_utils import FileUtils 18 19from image_checksummer import ImageChecksummer 20 21CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 22 23 24class CrosMachine(object): 25 def __init__(self, name, chromeos_root): 26 self.name = name 27 self.image = None 28 self.checksum = None 29 self.locked = False 30 self.released_time = time.time() 31 self.autotest_run = None 32 self.chromeos_root = chromeos_root 33 self._GetMemoryInfo() 34 self._GetCPUInfo() 35 self._ComputeMachineChecksumString() 36 self._GetMachineID() 37 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 38 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 39 40 def _ParseMemoryInfo(self): 41 line = self.meminfo.splitlines()[0] 42 usable_kbytes = int(line.split()[1]) 43 # This code is from src/third_party/autotest/files/client/bin/base_utils.py 44 # usable_kbytes is system's usable DRAM in kbytes, 45 # as reported by memtotal() from device /proc/meminfo memtotal 46 # after Linux deducts 1.5% to 9.5% for system table overhead 47 # Undo the unknown actual deduction by rounding up 48 # to next small multiple of a big power-of-two 49 # eg 12GB - 5.1% gets rounded back up to 12GB 50 mindeduct = 0.005 # 0.5 percent 51 maxdeduct = 0.095 # 9.5 percent 52 # deduction range 1.5% .. 9.5% supports physical mem sizes 53 # 6GB .. 12GB in steps of .5GB 54 # 12GB .. 24GB in steps of 1 GB 55 # 24GB .. 48GB in steps of 2 GB ... 56 # Finer granularity in physical mem sizes would require 57 # tighter spread between min and max possible deductions 58 59 # increase mem size by at least min deduction, without rounding 60 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 61 # increase mem size further by 2**n rounding, by 0..roundKb or more 62 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 63 # find least binary roundup 2**n that covers worst-cast roundKb 64 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 65 # have round_kbytes <= mod2n < round_kbytes*2 66 # round min_kbytes up to next multiple of mod2n 67 phys_kbytes = min_kbytes + mod2n - 1 68 phys_kbytes -= phys_kbytes % mod2n # clear low bits 69 self.phys_kbytes = phys_kbytes 70 71 def _GetMemoryInfo(self): 72 #TODO yunlian: when the machine in rebooting, it will not return 73 #meminfo, the assert does not catch it either 74 ce = command_executer.GetCommandExecuter() 75 command = "cat /proc/meminfo" 76 ret, self.meminfo, _ = ce.CrosRunCommand( 77 command, return_output=True, 78 machine=self.name, username="root", chromeos_root=self.chromeos_root) 79 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 80 if ret == 0: 81 self._ParseMemoryInfo() 82 83 #cpuinfo format is different across architecture 84 #need to find a better way to parse it. 85 def _ParseCPUInfo(self,cpuinfo): 86 return 0 87 88 def _GetCPUInfo(self): 89 ce = command_executer.GetCommandExecuter() 90 command = "cat /proc/cpuinfo" 91 ret, self.cpuinfo, _ = ce.CrosRunCommand( 92 command, return_output=True, 93 machine=self.name, username="root", chromeos_root=self.chromeos_root) 94 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 95 if ret == 0: 96 self._ParseCPUInfo(self.cpuinfo) 97 98 def _ComputeMachineChecksumString(self): 99 self.checksum_string = "" 100 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 101 for line in self.cpuinfo.splitlines(): 102 if not any([e in line for e in exclude_lines_list]): 103 self.checksum_string += line 104 self.checksum_string += " " + str(self.phys_kbytes) 105 106 def _GetMD5Checksum(self, ss): 107 if ss: 108 return hashlib.md5(ss).hexdigest() 109 else: 110 return "" 111 112 def _GetMachineID(self): 113 ce = command_executer.GetCommandExecuter() 114 command = "ifconfig" 115 ret, if_out, _ = ce.CrosRunCommand( 116 command, return_output=True, 117 machine=self.name, chromeos_root=self.chromeos_root) 118 b = if_out.splitlines() 119 a = [l for l in b if "lan" in l] 120 self.machine_id = a[0] 121 assert ret == 0, "Could not get machine_id from machine: %s" % self.name 122 123 def __str__(self): 124 l = [] 125 l.append(self.name) 126 l.append(str(self.image)) 127 l.append(str(self.checksum)) 128 l.append(str(self.locked)) 129 l.append(str(self.released_time)) 130 return ", ".join(l) 131 132 133class MachineManager(object): 134 def __init__(self, chromeos_root): 135 self._lock = threading.RLock() 136 self._all_machines = [] 137 self._machines = [] 138 self.image_lock = threading.Lock() 139 self.num_reimages = 0 140 self.chromeos_root = None 141 self.machine_checksum = {} 142 self.machine_checksum_string = {} 143 144 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 145 self.no_lock = False 146 else: 147 self.no_lock = True 148 self._initialized_machines = [] 149 self.chromeos_root = chromeos_root 150 151 def ImageMachine(self, machine, label): 152 checksum = ImageChecksummer().Checksum(label.chromeos_image) 153 if machine.checksum == checksum: 154 return 155 chromeos_root = label.chromeos_root 156 if not chromeos_root: 157 chromeos_root = self.chromeos_root 158 image_chromeos_args = [image_chromeos.__file__, 159 "--chromeos_root=%s" % chromeos_root, 160 "--image=%s" % label.chromeos_image, 161 "--image_args=%s" % label.image_args, 162 "--remote=%s" % machine.name] 163 if label.board: 164 image_chromeos_args.append("--board=%s" % label.board) 165 166 # Currently can't image two machines at once. 167 # So have to serialized on this lock. 168 ce = command_executer.GetCommandExecuter() 169 with self.image_lock: 170 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 171 if retval: 172 raise Exception("Could not image machine: '%s'." % machine.name) 173 else: 174 self.num_reimages += 1 175 machine.checksum = checksum 176 machine.image = label.chromeos_image 177 178 return retval 179 180 def ComputeCommonCheckSum(self, label): 181 for machine in self.GetMachines(label): 182 if machine.machine_checksum: 183 self.machine_checksum[label.name] = machine.machine_checksum 184 break 185 186 def ComputeCommonCheckSumString(self, label): 187 for machine in self.GetMachines(label): 188 if machine.checksum_string: 189 self.machine_checksum_string[label.name] = machine.checksum_string 190 break 191 192 def _TryToLockMachine(self, cros_machine): 193 with self._lock: 194 assert cros_machine, "Machine can't be None" 195 for m in self._machines: 196 assert m.name != cros_machine.name, ( 197 "Tried to double-lock %s" % cros_machine.name) 198 if self.no_lock: 199 locked = True 200 else: 201 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 202 if locked: 203 self._machines.append(cros_machine) 204 ce = command_executer.GetCommandExecuter() 205 command = "cat %s" % CHECKSUM_FILE 206 ret, out, _ = ce.CrosRunCommand( 207 command, return_output=True, chromeos_root=self.chromeos_root, 208 machine=cros_machine.name) 209 if ret == 0: 210 cros_machine.checksum = out.strip() 211 else: 212 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 213 214 # This is called from single threaded mode. 215 def AddMachine(self, machine_name): 216 with self._lock: 217 for m in self._all_machines: 218 assert m.name != machine_name, "Tried to double-add %s" % machine_name 219 cm = CrosMachine(machine_name, self.chromeos_root) 220 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 221 machine_name) 222 self._all_machines.append(cm) 223 224 def AreAllMachineSame(self, label): 225 checksums = [m.machine_checksum for m in self.GetMachines(label)] 226 return len(set(checksums)) == 1 227 228 def AcquireMachine(self, chromeos_image, label): 229 image_checksum = ImageChecksummer().Checksum(chromeos_image) 230 machines = self.GetMachines(label) 231 with self._lock: 232 # Lazily external lock machines 233 234 for m in machines: 235 if m not in self._initialized_machines: 236 self._initialized_machines.append(m) 237 self._TryToLockMachine(m) 238 m.released_time = time.time() 239 if not self.AreAllMachineSame(label): 240 logger.GetLogger().LogFatal("-- not all the machine are identical") 241 if not self.GetAvailableMachines(label): 242 machine_names = [] 243 for machine in machines: 244 machine_names.append(machine.name) 245 logger.GetLogger().LogFatal("Could not acquire any of the " 246 "following machines: '%s'" 247 % ", ".join(machine_names)) 248 249### for m in self._machines: 250### if (m.locked and time.time() - m.released_time < 10 and 251### m.checksum == image_checksum): 252### return None 253 for m in [machine for machine in self.GetAvailableMachines(label) 254 if not machine.locked]: 255 if m.checksum == image_checksum: 256 m.locked = True 257 m.autotest_run = threading.current_thread() 258 return m 259 for m in [machine for machine in self.GetAvailableMachines(label) 260 if not machine.locked]: 261 if not m.checksum: 262 m.locked = True 263 m.autotest_run = threading.current_thread() 264 return m 265 # This logic ensures that threads waiting on a machine will get a machine 266 # with a checksum equal to their image over other threads. This saves time 267 # when crosperf initially assigns the machines to threads by minimizing 268 # the number of re-images. 269 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 270 # code and can implement minimal reimaging code more cleanly. 271 for m in [machine for machine in self.GetAvailableMachines(label) 272 if not machine.locked]: 273 if time.time() - m.released_time > 20: 274 m.locked = True 275 m.autotest_run = threading.current_thread() 276 return m 277 return None 278 279 def GetAvailableMachines(self, label=None): 280 if not label: 281 return self._machines 282 return [m for m in self._machines if m.name in label.remote] 283 284 def GetMachines(self, label=None): 285 if not label: 286 return self._all_machines 287 return [m for m in self._all_machines if m.name in label.remote] 288 289 def ReleaseMachine(self, machine): 290 with self._lock: 291 for m in self._machines: 292 if machine.name == m.name: 293 assert m.locked == True, "Tried to double-release %s" % m.name 294 m.released_time = time.time() 295 m.locked = False 296 m.status = "Available" 297 break 298 299 def Cleanup(self): 300 with self._lock: 301 # Unlock all machines. 302 for m in self._machines: 303 if not self.no_lock: 304 res = lock_machine.Machine(m.name).Unlock(True) 305 if not res: 306 logger.GetLogger().LogError("Could not unlock machine: '%s'." 307 % m.name) 308 309 def __str__(self): 310 with self._lock: 311 l = ["MachineManager Status:"] 312 for m in self._machines: 313 l.append(str(m)) 314 return "\n".join(l) 315 316 def AsString(self): 317 with self._lock: 318 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 319 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 320 "Checksum") 321 table = [header] 322 for m in self._machines: 323 if m.autotest_run: 324 autotest_name = m.autotest_run.name 325 autotest_status = m.autotest_run.timeline.GetLastEvent() 326 else: 327 autotest_name = "" 328 autotest_status = "" 329 330 try: 331 machine_string = stringify_fmt % (m.name, 332 autotest_name, 333 m.locked, 334 autotest_status, 335 m.checksum) 336 except Exception: 337 machine_string = "" 338 table.append(machine_string) 339 return "Machine Status:\n%s" % "\n".join(table) 340 341 def GetAllCPUInfo(self, labels): 342 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 343 dic = {} 344 for label in labels: 345 for machine in self._all_machines: 346 if machine.name in label.remote: 347 if machine.cpuinfo not in dic: 348 dic[machine.cpuinfo] = [label.name] 349 else: 350 dic[machine.cpuinfo].append(label.name) 351 break 352 output = "" 353 for key, v in dic.items(): 354 output += " ".join(v) 355 output += "\n-------------------\n" 356 output += key 357 output += "\n\n\n" 358 return output 359 360 361class MockCrosMachine(CrosMachine): 362 def __init__(self, name, chromeos_root): 363 self.name = name 364 self.image = None 365 self.checksum = None 366 self.locked = False 367 self.released_time = time.time() 368 self.autotest_run = None 369 self.chromeos_root = chromeos_root 370 self.checksum_string = re.sub("\d", "", name) 371 #In test, we assume "lumpy1", "lumpy2" are the same machine. 372 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 373 374 375class MockMachineManager(MachineManager): 376 377 def __init__(self, chromeos_root): 378 super(MockMachineManager, self).__init__(chromeos_root) 379 380 def _TryToLockMachine(self, cros_machine): 381 self._machines.append(cros_machine) 382 cros_machine.checksum = "" 383 384 def AddMachine(self, machine_name): 385 with self._lock: 386 for m in self._all_machines: 387 assert m.name != machine_name, "Tried to double-add %s" % machine_name 388 cm = MockCrosMachine(machine_name, self.chromeos_root) 389 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 390 machine_name) 391 self._all_machines.append(cm) 392 393 def AcquireMachine(self, chromeos_image, label): 394 for machine in self._all_machines: 395 if not machine.locked: 396 machine.locked = True 397 return machine 398 return None 399 400 def ImageMachine(self, machine_name, label): 401 return 0 402 403 def ReleaseMachine(self, machine): 404 machine.locked = False 405 406 def GetMachines(self, label): 407 return self._all_machines 408 409 def GetAvailableMachines(self, label): 410 return self._all_machines 411