machine_manager.py revision 837e07a6a2f455a4699bf4ad1b28653fde3ca3b0
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17from utils import command_executer 18from utils import logger 19from utils.file_utils import FileUtils 20 21from image_checksummer import ImageChecksummer 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25 26class CrosMachine(object): 27 def __init__(self, name, chromeos_root): 28 self.name = name 29 self.image = None 30 self.checksum = None 31 self.locked = False 32 self.released_time = time.time() 33 self.autotest_run = None 34 self.chromeos_root = chromeos_root 35 if not self._IsReachable(): 36 self.machine_checksum = None 37 return 38 self._GetMemoryInfo() 39 self._GetCPUInfo() 40 self._ComputeMachineChecksumString() 41 self._GetMachineID() 42 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 43 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 44 45 def _IsReachable(self): 46 ce = command_executer.GetCommandExecuter() 47 command = "ls" 48 ret = ce.CrosRunCommand(command, 49 machine=self.name, 50 chromeos_root=self.chromeos_root) 51 if ret: 52 return False 53 return True 54 55 def _ParseMemoryInfo(self): 56 line = self.meminfo.splitlines()[0] 57 usable_kbytes = int(line.split()[1]) 58 # This code is from src/third_party/autotest/files/client/bin/base_utils.py 59 # usable_kbytes is system's usable DRAM in kbytes, 60 # as reported by memtotal() from device /proc/meminfo memtotal 61 # after Linux deducts 1.5% to 9.5% for system table overhead 62 # Undo the unknown actual deduction by rounding up 63 # to next small multiple of a big power-of-two 64 # eg 12GB - 5.1% gets rounded back up to 12GB 65 mindeduct = 0.005 # 0.5 percent 66 maxdeduct = 0.095 # 9.5 percent 67 # deduction range 1.5% .. 9.5% supports physical mem sizes 68 # 6GB .. 12GB in steps of .5GB 69 # 12GB .. 24GB in steps of 1 GB 70 # 24GB .. 48GB in steps of 2 GB ... 71 # Finer granularity in physical mem sizes would require 72 # tighter spread between min and max possible deductions 73 74 # increase mem size by at least min deduction, without rounding 75 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 76 # increase mem size further by 2**n rounding, by 0..roundKb or more 77 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 78 # find least binary roundup 2**n that covers worst-cast roundKb 79 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 80 # have round_kbytes <= mod2n < round_kbytes*2 81 # round min_kbytes up to next multiple of mod2n 82 phys_kbytes = min_kbytes + mod2n - 1 83 phys_kbytes -= phys_kbytes % mod2n # clear low bits 84 self.phys_kbytes = phys_kbytes 85 86 def _GetMemoryInfo(self): 87 #TODO yunlian: when the machine in rebooting, it will not return 88 #meminfo, the assert does not catch it either 89 ce = command_executer.GetCommandExecuter() 90 command = "cat /proc/meminfo" 91 ret, self.meminfo, _ = ce.CrosRunCommand( 92 command, return_output=True, 93 machine=self.name, username="root", chromeos_root=self.chromeos_root) 94 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 95 if ret == 0: 96 self._ParseMemoryInfo() 97 98 #cpuinfo format is different across architecture 99 #need to find a better way to parse it. 100 def _ParseCPUInfo(self,cpuinfo): 101 return 0 102 103 def _GetCPUInfo(self): 104 ce = command_executer.GetCommandExecuter() 105 command = "cat /proc/cpuinfo" 106 ret, self.cpuinfo, _ = ce.CrosRunCommand( 107 command, return_output=True, 108 machine=self.name, username="root", chromeos_root=self.chromeos_root) 109 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 110 if ret == 0: 111 self._ParseCPUInfo(self.cpuinfo) 112 113 def _ComputeMachineChecksumString(self): 114 self.checksum_string = "" 115 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 116 for line in self.cpuinfo.splitlines(): 117 if not any([e in line for e in exclude_lines_list]): 118 self.checksum_string += line 119 self.checksum_string += " " + str(self.phys_kbytes) 120 121 def _GetMD5Checksum(self, ss): 122 if ss: 123 return hashlib.md5(ss).hexdigest() 124 else: 125 return "" 126 127 def _GetMachineID(self): 128 ce = command_executer.GetCommandExecuter() 129 command = "dump_vpd_log --full --stdout" 130 ret, if_out, _ = ce.CrosRunCommand( 131 command, return_output=True, 132 machine=self.name, chromeos_root=self.chromeos_root) 133 b = if_out.splitlines() 134 a = [l for l in b if "Product" in l] 135 self.machine_id = a[0] 136 assert ret == 0, "Could not get machine_id from machine: %s" % self.name 137 138 def __str__(self): 139 l = [] 140 l.append(self.name) 141 l.append(str(self.image)) 142 l.append(str(self.checksum)) 143 l.append(str(self.locked)) 144 l.append(str(self.released_time)) 145 return ", ".join(l) 146 147 148class MachineManager(object): 149 def __init__(self, chromeos_root, acquire_timeout): 150 self._lock = threading.RLock() 151 self._all_machines = [] 152 self._machines = [] 153 self.image_lock = threading.Lock() 154 self.num_reimages = 0 155 self.chromeos_root = None 156 self.machine_checksum = {} 157 self.machine_checksum_string = {} 158 self.acquire_timeout = acquire_timeout 159 160 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 161 self.no_lock = False 162 else: 163 self.no_lock = True 164 self._initialized_machines = [] 165 self.chromeos_root = chromeos_root 166 167 def ImageMachine(self, machine, label): 168 checksum = ImageChecksummer().Checksum(label) 169 if machine.checksum == checksum: 170 return 171 chromeos_root = label.chromeos_root 172 if not chromeos_root: 173 chromeos_root = self.chromeos_root 174 image_chromeos_args = [image_chromeos.__file__, 175 "--chromeos_root=%s" % chromeos_root, 176 "--image=%s" % label.chromeos_image, 177 "--image_args=%s" % label.image_args, 178 "--remote=%s" % machine.name] 179 if label.board: 180 image_chromeos_args.append("--board=%s" % label.board) 181 182 # Currently can't image two machines at once. 183 # So have to serialized on this lock. 184 ce = command_executer.GetCommandExecuter() 185 with self.image_lock: 186 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 187 if retval: 188 cmd ="reboot && exit" 189 ce.CrosRunCommand(cmd, machine=machine.name, 190 chromeos_root=self.chromeos_root) 191 time.sleep(60) 192 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 193 if retval: 194 raise Exception("Could not image machine: '%s'." % machine.name) 195 else: 196 self.num_reimages += 1 197 machine.checksum = checksum 198 machine.image = label.chromeos_image 199 200 return retval 201 202 def ComputeCommonCheckSum(self, label): 203 for machine in self.GetMachines(label): 204 if machine.machine_checksum: 205 self.machine_checksum[label.name] = machine.machine_checksum 206 break 207 208 def ComputeCommonCheckSumString(self, label): 209 for machine in self.GetMachines(label): 210 if machine.checksum_string: 211 self.machine_checksum_string[label.name] = machine.checksum_string 212 break 213 214 def _TryToLockMachine(self, cros_machine): 215 with self._lock: 216 assert cros_machine, "Machine can't be None" 217 for m in self._machines: 218 if m.name == cros_machine.name: 219 return 220 if self.no_lock: 221 locked = True 222 else: 223 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 224 if locked: 225 self._machines.append(cros_machine) 226 ce = command_executer.GetCommandExecuter() 227 command = "cat %s" % CHECKSUM_FILE 228 ret, out, _ = ce.CrosRunCommand( 229 command, return_output=True, chromeos_root=self.chromeos_root, 230 machine=cros_machine.name) 231 if ret == 0: 232 cros_machine.checksum = out.strip() 233 else: 234 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 235 236 # This is called from single threaded mode. 237 def AddMachine(self, machine_name): 238 with self._lock: 239 for m in self._all_machines: 240 assert m.name != machine_name, "Tried to double-add %s" % machine_name 241 cm = CrosMachine(machine_name, self.chromeos_root) 242 if cm.machine_checksum: 243 self._all_machines.append(cm) 244 245 def AreAllMachineSame(self, label): 246 checksums = [m.machine_checksum for m in self.GetMachines(label)] 247 return len(set(checksums)) == 1 248 249 def AcquireMachine(self, chromeos_image, label): 250 image_checksum = ImageChecksummer().Checksum(label) 251 machines = self.GetMachines(label) 252 check_interval_time = 120 253 with self._lock: 254 # Lazily external lock machines 255 while self.acquire_timeout >= 0: 256 for m in machines: 257 new_machine = m not in self._all_machines 258 self._TryToLockMachine(m) 259 if new_machine: 260 m.released_time = time.time() 261 if not self.AreAllMachineSame(label): 262 logger.GetLogger().LogFatal("-- not all the machine are identical") 263 if self.GetAvailableMachines(label): 264 break 265 else: 266 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 267 time.sleep(sleep_time) 268 self.acquire_timeout -= sleep_time 269 270 if self.acquire_timeout < 0: 271 machine_names = [] 272 for machine in machines: 273 machine_names.append(machine.name) 274 logger.GetLogger().LogFatal("Could not acquire any of the " 275 "following machines: '%s'" 276 % ", ".join(machine_names)) 277 278### for m in self._machines: 279### if (m.locked and time.time() - m.released_time < 10 and 280### m.checksum == image_checksum): 281### return None 282 for m in [machine for machine in self.GetAvailableMachines(label) 283 if not machine.locked]: 284 if m.checksum == image_checksum: 285 m.locked = True 286 m.autotest_run = threading.current_thread() 287 return m 288 for m in [machine for machine in self.GetAvailableMachines(label) 289 if not machine.locked]: 290 if not m.checksum: 291 m.locked = True 292 m.autotest_run = threading.current_thread() 293 return m 294 # This logic ensures that threads waiting on a machine will get a machine 295 # with a checksum equal to their image over other threads. This saves time 296 # when crosperf initially assigns the machines to threads by minimizing 297 # the number of re-images. 298 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 299 # code and can implement minimal reimaging code more cleanly. 300 for m in [machine for machine in self.GetAvailableMachines(label) 301 if not machine.locked]: 302 if time.time() - m.released_time > 20: 303 m.locked = True 304 m.autotest_run = threading.current_thread() 305 return m 306 return None 307 308 def GetAvailableMachines(self, label=None): 309 if not label: 310 return self._machines 311 return [m for m in self._machines if m.name in label.remote] 312 313 def GetMachines(self, label=None): 314 if not label: 315 return self._all_machines 316 return [m for m in self._all_machines if m.name in label.remote] 317 318 def ReleaseMachine(self, machine): 319 with self._lock: 320 for m in self._machines: 321 if machine.name == m.name: 322 assert m.locked == True, "Tried to double-release %s" % m.name 323 m.released_time = time.time() 324 m.locked = False 325 m.status = "Available" 326 break 327 328 def Cleanup(self): 329 with self._lock: 330 # Unlock all machines. 331 for m in self._machines: 332 if not self.no_lock: 333 res = lock_machine.Machine(m.name).Unlock(True) 334 if not res: 335 logger.GetLogger().LogError("Could not unlock machine: '%s'." 336 % m.name) 337 338 def __str__(self): 339 with self._lock: 340 l = ["MachineManager Status:"] 341 for m in self._machines: 342 l.append(str(m)) 343 return "\n".join(l) 344 345 def AsString(self): 346 with self._lock: 347 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 348 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 349 "Checksum") 350 table = [header] 351 for m in self._machines: 352 if m.autotest_run: 353 autotest_name = m.autotest_run.name 354 autotest_status = m.autotest_run.timeline.GetLastEvent() 355 else: 356 autotest_name = "" 357 autotest_status = "" 358 359 try: 360 machine_string = stringify_fmt % (m.name, 361 autotest_name, 362 m.locked, 363 autotest_status, 364 m.checksum) 365 except Exception: 366 machine_string = "" 367 table.append(machine_string) 368 return "Machine Status:\n%s" % "\n".join(table) 369 370 def GetAllCPUInfo(self, labels): 371 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 372 dic = {} 373 for label in labels: 374 for machine in self._all_machines: 375 if machine.name in label.remote: 376 if machine.cpuinfo not in dic: 377 dic[machine.cpuinfo] = [label.name] 378 else: 379 dic[machine.cpuinfo].append(label.name) 380 break 381 output = "" 382 for key, v in dic.items(): 383 output += " ".join(v) 384 output += "\n-------------------\n" 385 output += key 386 output += "\n\n\n" 387 return output 388 389 390class MockCrosMachine(CrosMachine): 391 def __init__(self, name, chromeos_root): 392 self.name = name 393 self.image = None 394 self.checksum = None 395 self.locked = False 396 self.released_time = time.time() 397 self.autotest_run = None 398 self.chromeos_root = chromeos_root 399 self.checksum_string = re.sub("\d", "", name) 400 #In test, we assume "lumpy1", "lumpy2" are the same machine. 401 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 402 403 404class MockMachineManager(MachineManager): 405 406 def __init__(self, chromeos_root, acquire_timeout): 407 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout) 408 409 def _TryToLockMachine(self, cros_machine): 410 self._machines.append(cros_machine) 411 cros_machine.checksum = "" 412 413 def AddMachine(self, machine_name): 414 with self._lock: 415 for m in self._all_machines: 416 assert m.name != machine_name, "Tried to double-add %s" % machine_name 417 cm = MockCrosMachine(machine_name, self.chromeos_root) 418 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 419 machine_name) 420 self._all_machines.append(cm) 421 422 def AcquireMachine(self, chromeos_image, label): 423 for machine in self._all_machines: 424 if not machine.locked: 425 machine.locked = True 426 return machine 427 return None 428 429 def ImageMachine(self, machine_name, label): 430 return 0 431 432 def ReleaseMachine(self, machine): 433 machine.locked = False 434 435 def GetMachines(self, label): 436 return self._all_machines 437 438 def GetAvailableMachines(self, label): 439 return self._all_machines 440