machine_manager.py revision e5b673f1b459a0927ca65bf9e6fd504ece57e39d
1#!/usr/bin/python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import hashlib 8import image_chromeos 9import lock_machine 10import math 11import os.path 12import re 13import sys 14import threading 15import time 16 17from utils import command_executer 18from utils import logger 19from utils.file_utils import FileUtils 20 21from image_checksummer import ImageChecksummer 22 23CHECKSUM_FILE = "/usr/local/osimage_checksum_file" 24 25 26class CrosMachine(object): 27 def __init__(self, name, chromeos_root): 28 self.name = name 29 self.image = None 30 self.checksum = None 31 self.locked = False 32 self.released_time = time.time() 33 self.autotest_run = None 34 self.chromeos_root = chromeos_root 35 if not self.IsReachable(): 36 self.machine_checksum = None 37 return 38 self._GetMemoryInfo() 39 self._GetCPUInfo() 40 self._ComputeMachineChecksumString() 41 self._GetMachineID() 42 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 43 self.machine_id_checksum = self._GetMD5Checksum(self.machine_id) 44 45 def IsReachable(self): 46 ce = command_executer.GetCommandExecuter() 47 command = "ls" 48 ret = ce.CrosRunCommand(command, 49 machine=self.name, 50 chromeos_root=self.chromeos_root) 51 if ret: 52 return False 53 return True 54 55 def _ParseMemoryInfo(self): 56 line = self.meminfo.splitlines()[0] 57 usable_kbytes = int(line.split()[1]) 58 # This code is from src/third_party/autotest/files/client/bin/base_utils.py 59 # usable_kbytes is system's usable DRAM in kbytes, 60 # as reported by memtotal() from device /proc/meminfo memtotal 61 # after Linux deducts 1.5% to 9.5% for system table overhead 62 # Undo the unknown actual deduction by rounding up 63 # to next small multiple of a big power-of-two 64 # eg 12GB - 5.1% gets rounded back up to 12GB 65 mindeduct = 0.005 # 0.5 percent 66 maxdeduct = 0.095 # 9.5 percent 67 # deduction range 1.5% .. 9.5% supports physical mem sizes 68 # 6GB .. 12GB in steps of .5GB 69 # 12GB .. 24GB in steps of 1 GB 70 # 24GB .. 48GB in steps of 2 GB ... 71 # Finer granularity in physical mem sizes would require 72 # tighter spread between min and max possible deductions 73 74 # increase mem size by at least min deduction, without rounding 75 min_kbytes = int(usable_kbytes / (1.0 - mindeduct)) 76 # increase mem size further by 2**n rounding, by 0..roundKb or more 77 round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes 78 # find least binary roundup 2**n that covers worst-cast roundKb 79 mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2))) 80 # have round_kbytes <= mod2n < round_kbytes*2 81 # round min_kbytes up to next multiple of mod2n 82 phys_kbytes = min_kbytes + mod2n - 1 83 phys_kbytes -= phys_kbytes % mod2n # clear low bits 84 self.phys_kbytes = phys_kbytes 85 86 def _GetMemoryInfo(self): 87 #TODO yunlian: when the machine in rebooting, it will not return 88 #meminfo, the assert does not catch it either 89 ce = command_executer.GetCommandExecuter() 90 command = "cat /proc/meminfo" 91 ret, self.meminfo, _ = ce.CrosRunCommand( 92 command, return_output=True, 93 machine=self.name, username="root", chromeos_root=self.chromeos_root) 94 assert ret == 0, "Could not get meminfo from machine: %s" % self.name 95 if ret == 0: 96 self._ParseMemoryInfo() 97 98 #cpuinfo format is different across architecture 99 #need to find a better way to parse it. 100 def _ParseCPUInfo(self,cpuinfo): 101 return 0 102 103 def _GetCPUInfo(self): 104 ce = command_executer.GetCommandExecuter() 105 command = "cat /proc/cpuinfo" 106 ret, self.cpuinfo, _ = ce.CrosRunCommand( 107 command, return_output=True, 108 machine=self.name, username="root", chromeos_root=self.chromeos_root) 109 assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name 110 if ret == 0: 111 self._ParseCPUInfo(self.cpuinfo) 112 113 def _ComputeMachineChecksumString(self): 114 self.checksum_string = "" 115 exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"] 116 for line in self.cpuinfo.splitlines(): 117 if not any([e in line for e in exclude_lines_list]): 118 self.checksum_string += line 119 self.checksum_string += " " + str(self.phys_kbytes) 120 121 def _GetMD5Checksum(self, ss): 122 if ss: 123 return hashlib.md5(ss).hexdigest() 124 else: 125 return "" 126 127 def _GetMachineID(self): 128 ce = command_executer.GetCommandExecuter() 129 command = "dump_vpd_log --full --stdout" 130 ret, if_out, _ = ce.CrosRunCommand( 131 command, return_output=True, 132 machine=self.name, chromeos_root=self.chromeos_root) 133 b = if_out.splitlines() 134 a = [l for l in b if "Product" in l] 135 self.machine_id = a[0] 136 assert ret == 0, "Could not get machine_id from machine: %s" % self.name 137 138 def __str__(self): 139 l = [] 140 l.append(self.name) 141 l.append(str(self.image)) 142 l.append(str(self.checksum)) 143 l.append(str(self.locked)) 144 l.append(str(self.released_time)) 145 return ", ".join(l) 146 147 148class MachineManager(object): 149 def __init__(self, chromeos_root, acquire_timeout): 150 self._lock = threading.RLock() 151 self._all_machines = [] 152 self._machines = [] 153 self.image_lock = threading.Lock() 154 self.num_reimages = 0 155 self.chromeos_root = None 156 self.machine_checksum = {} 157 self.machine_checksum_string = {} 158 self.acquire_timeout = acquire_timeout 159 160 if os.path.isdir(lock_machine.Machine.LOCKS_DIR): 161 self.no_lock = False 162 else: 163 self.no_lock = True 164 self._initialized_machines = [] 165 self.chromeos_root = chromeos_root 166 167 def ImageMachine(self, machine, label): 168 checksum = ImageChecksummer().Checksum(label) 169 if machine.checksum == checksum: 170 return 171 chromeos_root = label.chromeos_root 172 if not chromeos_root: 173 chromeos_root = self.chromeos_root 174 image_chromeos_args = [image_chromeos.__file__, 175 "--chromeos_root=%s" % chromeos_root, 176 "--image=%s" % label.chromeos_image, 177 "--image_args=%s" % label.image_args, 178 "--remote=%s" % machine.name] 179 if label.board: 180 image_chromeos_args.append("--board=%s" % label.board) 181 182 # Currently can't image two machines at once. 183 # So have to serialized on this lock. 184 ce = command_executer.GetCommandExecuter() 185 with self.image_lock: 186 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 187 if retval: 188 cmd ="reboot && exit" 189 ce.CrosRunCommand(cmd, machine=machine.name, 190 chromeos_root=self.chromeos_root) 191 time.sleep(60) 192 retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args)) 193 if retval: 194 raise Exception("Could not image machine: '%s'." % machine.name) 195 else: 196 self.num_reimages += 1 197 machine.checksum = checksum 198 machine.image = label.chromeos_image 199 200 return retval 201 202 def ComputeCommonCheckSum(self, label): 203 for machine in self.GetMachines(label): 204 if machine.machine_checksum: 205 self.machine_checksum[label.name] = machine.machine_checksum 206 break 207 208 def ComputeCommonCheckSumString(self, label): 209 for machine in self.GetMachines(label): 210 if machine.checksum_string: 211 self.machine_checksum_string[label.name] = machine.checksum_string 212 break 213 214 def _TryToLockMachine(self, cros_machine): 215 with self._lock: 216 assert cros_machine, "Machine can't be None" 217 for m in self._machines: 218 if m.name == cros_machine.name: 219 return 220 if self.no_lock: 221 locked = True 222 else: 223 locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0]) 224 if locked: 225 self._machines.append(cros_machine) 226 ce = command_executer.GetCommandExecuter() 227 command = "cat %s" % CHECKSUM_FILE 228 ret, out, _ = ce.CrosRunCommand( 229 command, return_output=True, chromeos_root=self.chromeos_root, 230 machine=cros_machine.name) 231 if ret == 0: 232 cros_machine.checksum = out.strip() 233 else: 234 logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name) 235 236 # This is called from single threaded mode. 237 def AddMachine(self, machine_name): 238 with self._lock: 239 for m in self._all_machines: 240 assert m.name != machine_name, "Tried to double-add %s" % machine_name 241 cm = CrosMachine(machine_name, self.chromeos_root) 242 if cm.machine_checksum: 243 self._all_machines.append(cm) 244 245 def AreAllMachineSame(self, label): 246 checksums = [m.machine_checksum for m in self.GetMachines(label)] 247 return len(set(checksums)) == 1 248 249 def RemoveMachine(self, machine_name): 250 with self._lock: 251 self._machines = [m for m in self._machines 252 if m.name != machine_name] 253 res = lock_machine.Machine(machine_name).Unlock(True) 254 if not res: 255 logger.GetLogger().LogError("Could not unlock machine: '%s'." 256 % m.name) 257 258 def AcquireMachine(self, chromeos_image, label): 259 image_checksum = ImageChecksummer().Checksum(label) 260 machines = self.GetMachines(label) 261 check_interval_time = 120 262 with self._lock: 263 # Lazily external lock machines 264 while self.acquire_timeout >= 0: 265 for m in machines: 266 new_machine = m not in self._all_machines 267 self._TryToLockMachine(m) 268 if new_machine: 269 m.released_time = time.time() 270 if not self.AreAllMachineSame(label): 271 logger.GetLogger().LogFatal("-- not all the machine are identical") 272 if self.GetAvailableMachines(label): 273 break 274 else: 275 sleep_time = max(1, min(self.acquire_timeout, check_interval_time)) 276 time.sleep(sleep_time) 277 self.acquire_timeout -= sleep_time 278 279 if self.acquire_timeout < 0: 280 machine_names = [] 281 for machine in machines: 282 machine_names.append(machine.name) 283 logger.GetLogger().LogFatal("Could not acquire any of the " 284 "following machines: '%s'" 285 % ", ".join(machine_names)) 286 287### for m in self._machines: 288### if (m.locked and time.time() - m.released_time < 10 and 289### m.checksum == image_checksum): 290### return None 291 for m in [machine for machine in self.GetAvailableMachines(label) 292 if not machine.locked]: 293 if m.checksum == image_checksum: 294 m.locked = True 295 m.autotest_run = threading.current_thread() 296 return m 297 for m in [machine for machine in self.GetAvailableMachines(label) 298 if not machine.locked]: 299 if not m.checksum: 300 m.locked = True 301 m.autotest_run = threading.current_thread() 302 return m 303 # This logic ensures that threads waiting on a machine will get a machine 304 # with a checksum equal to their image over other threads. This saves time 305 # when crosperf initially assigns the machines to threads by minimizing 306 # the number of re-images. 307 # TODO(asharif): If we centralize the thread-scheduler, we wont need this 308 # code and can implement minimal reimaging code more cleanly. 309 for m in [machine for machine in self.GetAvailableMachines(label) 310 if not machine.locked]: 311 if time.time() - m.released_time > 20: 312 m.locked = True 313 m.autotest_run = threading.current_thread() 314 return m 315 return None 316 317 def GetAvailableMachines(self, label=None): 318 if not label: 319 return self._machines 320 return [m for m in self._machines if m.name in label.remote] 321 322 def GetMachines(self, label=None): 323 if not label: 324 return self._all_machines 325 return [m for m in self._all_machines if m.name in label.remote] 326 327 def ReleaseMachine(self, machine): 328 with self._lock: 329 for m in self._machines: 330 if machine.name == m.name: 331 assert m.locked == True, "Tried to double-release %s" % m.name 332 m.released_time = time.time() 333 m.locked = False 334 m.status = "Available" 335 break 336 337 def Cleanup(self): 338 with self._lock: 339 # Unlock all machines. 340 for m in self._machines: 341 if not self.no_lock: 342 res = lock_machine.Machine(m.name).Unlock(True) 343 if not res: 344 logger.GetLogger().LogError("Could not unlock machine: '%s'." 345 % m.name) 346 347 def __str__(self): 348 with self._lock: 349 l = ["MachineManager Status:"] 350 for m in self._machines: 351 l.append(str(m)) 352 return "\n".join(l) 353 354 def AsString(self): 355 with self._lock: 356 stringify_fmt = "%-30s %-10s %-4s %-25s %-32s" 357 header = stringify_fmt % ("Machine", "Thread", "Lock", "Status", 358 "Checksum") 359 table = [header] 360 for m in self._machines: 361 if m.autotest_run: 362 autotest_name = m.autotest_run.name 363 autotest_status = m.autotest_run.timeline.GetLastEvent() 364 else: 365 autotest_name = "" 366 autotest_status = "" 367 368 try: 369 machine_string = stringify_fmt % (m.name, 370 autotest_name, 371 m.locked, 372 autotest_status, 373 m.checksum) 374 except Exception: 375 machine_string = "" 376 table.append(machine_string) 377 return "Machine Status:\n%s" % "\n".join(table) 378 379 def GetAllCPUInfo(self, labels): 380 """Get cpuinfo for labels, merge them if their cpuinfo are the same.""" 381 dic = {} 382 for label in labels: 383 for machine in self._all_machines: 384 if machine.name in label.remote: 385 if machine.cpuinfo not in dic: 386 dic[machine.cpuinfo] = [label.name] 387 else: 388 dic[machine.cpuinfo].append(label.name) 389 break 390 output = "" 391 for key, v in dic.items(): 392 output += " ".join(v) 393 output += "\n-------------------\n" 394 output += key 395 output += "\n\n\n" 396 return output 397 398 399class MockCrosMachine(CrosMachine): 400 def __init__(self, name, chromeos_root): 401 self.name = name 402 self.image = None 403 self.checksum = None 404 self.locked = False 405 self.released_time = time.time() 406 self.autotest_run = None 407 self.chromeos_root = chromeos_root 408 self.checksum_string = re.sub("\d", "", name) 409 #In test, we assume "lumpy1", "lumpy2" are the same machine. 410 self.machine_checksum = self._GetMD5Checksum(self.checksum_string) 411 412 413class MockMachineManager(MachineManager): 414 415 def __init__(self, chromeos_root, acquire_timeout): 416 super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout) 417 418 def _TryToLockMachine(self, cros_machine): 419 self._machines.append(cros_machine) 420 cros_machine.checksum = "" 421 422 def AddMachine(self, machine_name): 423 with self._lock: 424 for m in self._all_machines: 425 assert m.name != machine_name, "Tried to double-add %s" % machine_name 426 cm = MockCrosMachine(machine_name, self.chromeos_root) 427 assert cm.machine_checksum, ("Could not find checksum for machine %s" % 428 machine_name) 429 self._all_machines.append(cm) 430 431 def AcquireMachine(self, chromeos_image, label): 432 for machine in self._all_machines: 433 if not machine.locked: 434 machine.locked = True 435 return machine 436 return None 437 438 def ImageMachine(self, machine_name, label): 439 return 0 440 441 def ReleaseMachine(self, machine): 442 machine.locked = False 443 444 def GetMachines(self, label): 445 return self._all_machines 446 447 def GetAvailableMachines(self, label): 448 return self._all_machines 449