machine_manager.py revision c454cee542ca459ef9bd87c9f72e81c822caf1e5
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import hashlib
8import image_chromeos
9import lock_machine
10import math
11import os.path
12import re
13import sys
14import threading
15import time
16
17from utils import command_executer
18from utils import logger
19from utils.file_utils import FileUtils
20
21from image_checksummer import ImageChecksummer
22
23CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
24
25
26class CrosMachine(object):
27  def __init__(self, name, chromeos_root, log_level):
28    self.name = name
29    self.image = None
30    self.checksum = None
31    self.locked = False
32    self.released_time = time.time()
33    self.test_run = None
34    self.chromeos_root = chromeos_root
35    self.log_level = log_level
36    if not self.IsReachable():
37      self.machine_checksum = None
38      return
39    self._GetMemoryInfo()
40    self._GetCPUInfo()
41    self._ComputeMachineChecksumString()
42    self._GetMachineID()
43    self.machine_checksum = self._GetMD5Checksum(self.checksum_string)
44    self.machine_id_checksum = self._GetMD5Checksum(self.machine_id)
45
46  def IsReachable(self):
47    ce = command_executer.GetCommandExecuter(log_level=self.log_level)
48    command = "ls"
49    ret = ce.CrosRunCommand(command,
50                            machine=self.name,
51                            chromeos_root=self.chromeos_root)
52    if ret:
53      return False
54    return True
55
56  def _ParseMemoryInfo(self):
57    line = self.meminfo.splitlines()[0]
58    usable_kbytes = int(line.split()[1])
59    # This code is from src/third_party/test/files/client/bin/base_utils.py
60    # usable_kbytes is system's usable DRAM in kbytes,
61    #   as reported by memtotal() from device /proc/meminfo memtotal
62    #   after Linux deducts 1.5% to 9.5% for system table overhead
63    # Undo the unknown actual deduction by rounding up
64    #   to next small multiple of a big power-of-two
65    #   eg  12GB - 5.1% gets rounded back up to 12GB
66    mindeduct = 0.005  # 0.5 percent
67    maxdeduct = 0.095  # 9.5 percent
68    # deduction range 1.5% .. 9.5% supports physical mem sizes
69    #    6GB .. 12GB in steps of .5GB
70    #   12GB .. 24GB in steps of 1 GB
71    #   24GB .. 48GB in steps of 2 GB ...
72    # Finer granularity in physical mem sizes would require
73    #   tighter spread between min and max possible deductions
74
75    # increase mem size by at least min deduction, without rounding
76    min_kbytes = int(usable_kbytes / (1.0 - mindeduct))
77    # increase mem size further by 2**n rounding, by 0..roundKb or more
78    round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes
79    # find least binary roundup 2**n that covers worst-cast roundKb
80    mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2)))
81    # have round_kbytes <= mod2n < round_kbytes*2
82    # round min_kbytes up to next multiple of mod2n
83    phys_kbytes = min_kbytes + mod2n - 1
84    phys_kbytes -= phys_kbytes % mod2n  # clear low bits
85    self.phys_kbytes = phys_kbytes
86
87  def _GetMemoryInfo(self):
88    #TODO yunlian: when the machine in rebooting, it will not return
89    #meminfo, the assert does not catch it either
90    ce = command_executer.GetCommandExecuter(log_level=self.log_level)
91    command = "cat /proc/meminfo"
92    ret, self.meminfo, _ = ce.CrosRunCommand(
93        command, return_output=True,
94        machine=self.name, username="root", chromeos_root=self.chromeos_root)
95    assert ret == 0, "Could not get meminfo from machine: %s" % self.name
96    if ret == 0:
97      self._ParseMemoryInfo()
98
99  #cpuinfo format is different across architecture
100  #need to find a better way to parse it.
101  def _ParseCPUInfo(self,cpuinfo):
102    return 0
103
104  def _GetCPUInfo(self):
105    ce = command_executer.GetCommandExecuter(log_level=self.log_level)
106    command = "cat /proc/cpuinfo"
107    ret, self.cpuinfo, _ = ce.CrosRunCommand(
108        command, return_output=True,
109        machine=self.name, username="root", chromeos_root=self.chromeos_root)
110    assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name
111    if ret == 0:
112      self._ParseCPUInfo(self.cpuinfo)
113
114  def _ComputeMachineChecksumString(self):
115    self.checksum_string = ""
116    exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"]
117    for line in self.cpuinfo.splitlines():
118      if not any([e in line for e in exclude_lines_list]):
119        self.checksum_string += line
120    self.checksum_string += " " + str(self.phys_kbytes)
121
122  def _GetMD5Checksum(self, ss):
123    if ss:
124      return hashlib.md5(ss).hexdigest()
125    else:
126      return ""
127
128  def _GetMachineID(self):
129    ce = command_executer.GetCommandExecuter(log_level=self.log_level)
130    command = "dump_vpd_log --full --stdout"
131    ret, if_out, _ = ce.CrosRunCommand(
132        command, return_output=True,
133        machine=self.name, chromeos_root=self.chromeos_root)
134    b = if_out.splitlines()
135    a = [l for l in b if "Product" in l]
136    if len(a):
137      self.machine_id = a[0]
138      return
139    command = "ifconfig"
140    ret, if_out, _ = ce.CrosRunCommand(
141        command, return_output=True,
142        machine=self.name, chromeos_root=self.chromeos_root)
143    b = if_out.splitlines()
144    a = [l for l in b if "HWaddr" in l]
145    if len(a):
146      self.machine_id = "_".join(a)
147      return
148    a = [l for l in b if "ether" in l]
149    if len(a):
150      self.machine_id = "_".join(a)
151      return
152    assert 0, "Could not get machine_id from machine: %s" % self.name
153
154  def __str__(self):
155    l = []
156    l.append(self.name)
157    l.append(str(self.image))
158    l.append(str(self.checksum))
159    l.append(str(self.locked))
160    l.append(str(self.released_time))
161    return ", ".join(l)
162
163
164class MachineManager(object):
165  def __init__(self, chromeos_root, acquire_timeout, log_level):
166    self._lock = threading.RLock()
167    self._all_machines = []
168    self._machines = []
169    self.image_lock = threading.Lock()
170    self.num_reimages = 0
171    self.chromeos_root = None
172    self.machine_checksum = {}
173    self.machine_checksum_string = {}
174    self.acquire_timeout = acquire_timeout
175    self.log_level = log_level
176
177    if os.path.isdir(lock_machine.Machine.LOCKS_DIR):
178      self.no_lock = False
179    else:
180      self.no_lock = True
181    self._initialized_machines = []
182    self.chromeos_root = chromeos_root
183
184  def ImageMachine(self, machine, label):
185    if label.image_type == "local":
186      checksum = ImageChecksummer().Checksum(label, self.log_level)
187    elif label.image_type == "trybot":
188      checksum = machine._GetMD5Checksum(label.chromeos_image)
189    else:
190      checksum = None
191
192    if checksum and (machine.checksum == checksum):
193      return
194    chromeos_root = label.chromeos_root
195    if not chromeos_root:
196      chromeos_root = self.chromeos_root
197    image_chromeos_args = [image_chromeos.__file__,
198                           "--chromeos_root=%s" % chromeos_root,
199                           "--image=%s" % label.chromeos_image,
200                           "--image_args=%s" % label.image_args,
201                           "--remote=%s" % machine.name,
202                           "--logging_level=%s" % self.log_level]
203    if label.board:
204      image_chromeos_args.append("--board=%s" % label.board)
205
206    # Currently can't image two machines at once.
207    # So have to serialized on this lock.
208    if self.log_level != "verbose":
209      ce = command_executer.GetCommandExecuter(log_level="average")
210    else:
211      ce = command_executer.GetCommandExecuter()
212    with self.image_lock:
213      if self.log_level != "verbose":
214        logger.GetLogger().LogOutput("Pushing image onto machine.")
215        logger.GetLogger().LogOutput("CMD : python %s "
216                                 % " ".join(image_chromeos_args))
217      retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args))
218      if retval:
219        cmd ="reboot && exit"
220        if self.log_level != "verbose":
221          logger.GetLogger().LogOutput("reboot & exit.")
222        ce.CrosRunCommand(cmd, machine=machine.name,
223                          chromeos_root=self.chromeos_root)
224        time.sleep(60)
225        if self.log_level != "verbose":
226          logger.GetLogger().LogOutput("Pushing image onto machine.")
227          logger.GetLogger().LogOutput("CMD : python %s "
228                                     % " ".join(image_chromeos_args))
229        retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args))
230      if retval:
231        raise Exception("Could not image machine: '%s'." % machine.name)
232      else:
233        self.num_reimages += 1
234      machine.checksum = checksum
235      machine.image = label.chromeos_image
236
237    return retval
238
239  def ComputeCommonCheckSum(self, label):
240    for machine in self.GetMachines(label):
241      if machine.machine_checksum:
242        self.machine_checksum[label.name] = machine.machine_checksum
243        break
244
245  def ComputeCommonCheckSumString(self, label):
246    for machine in self.GetMachines(label):
247      if machine.checksum_string:
248        self.machine_checksum_string[label.name] = machine.checksum_string
249        break
250
251  def _TryToLockMachine(self, cros_machine):
252    with self._lock:
253      assert cros_machine, "Machine can't be None"
254      for m in self._machines:
255        if m.name == cros_machine.name:
256          return
257      if self.no_lock:
258        locked = True
259      else:
260        locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0])
261      if locked:
262        self._machines.append(cros_machine)
263        ce = command_executer.GetCommandExecuter(log_level=self.log_level)
264        command = "cat %s" % CHECKSUM_FILE
265        ret, out, _ = ce.CrosRunCommand(
266            command, return_output=True, chromeos_root=self.chromeos_root,
267            machine=cros_machine.name)
268        if ret == 0:
269          cros_machine.checksum = out.strip()
270      else:
271        logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name)
272
273  # This is called from single threaded mode.
274  def AddMachine(self, machine_name):
275    with self._lock:
276      for m in self._all_machines:
277        assert m.name != machine_name, "Tried to double-add %s" % machine_name
278      if self.log_level != "verbose":
279        logger.GetLogger().LogOutput("Setting up remote access to %s"
280                                   % machine_name)
281        logger.GetLogger().LogOutput("Checking machine characteristics for %s"
282                                   % machine_name)
283      cm = CrosMachine(machine_name, self.chromeos_root, self.log_level)
284      if cm.machine_checksum:
285        self._all_machines.append(cm)
286
287  def AreAllMachineSame(self, label):
288    checksums = [m.machine_checksum for m in self.GetMachines(label)]
289    return len(set(checksums)) == 1
290
291  def RemoveMachine(self, machine_name):
292    with self._lock:
293      self._machines = [m for m in self._machines
294                        if m.name != machine_name]
295      res = lock_machine.Machine(machine_name).Unlock(True)
296      if not res:
297        logger.GetLogger().LogError("Could not unlock machine: '%s'."
298                                    % m.name)
299
300  def AcquireMachine(self, chromeos_image, label):
301    if label.image_type == "local":
302      image_checksum = ImageChecksummer().Checksum(label, self.log_level)
303    elif label.image_type == "trybot":
304      image_checksum = hashlib.md5(chromeos_image).hexdigest()
305    else:
306      image_checksum = None
307    machines = self.GetMachines(label)
308    check_interval_time = 120
309    with self._lock:
310      # Lazily external lock machines
311      while self.acquire_timeout >= 0:
312        for m in machines:
313          new_machine = m not in self._all_machines
314          self._TryToLockMachine(m)
315          if new_machine:
316            m.released_time = time.time()
317        if not self.AreAllMachineSame(label):
318          logger.GetLogger().LogFatal("-- not all the machine are identical")
319        if self.GetAvailableMachines(label):
320          break
321        else:
322          sleep_time = max(1, min(self.acquire_timeout, check_interval_time))
323          time.sleep(sleep_time)
324          self.acquire_timeout -= sleep_time
325
326      if self.acquire_timeout < 0:
327        machine_names = []
328        for machine in machines:
329          machine_names.append(machine.name)
330        logger.GetLogger().LogFatal("Could not acquire any of the "
331                                    "following machines: '%s'"
332                                    % ", ".join(machine_names))
333
334###      for m in self._machines:
335###        if (m.locked and time.time() - m.released_time < 10 and
336###            m.checksum == image_checksum):
337###          return None
338      for m in [machine for machine in self.GetAvailableMachines(label)
339                if not machine.locked]:
340        if image_checksum and (m.checksum == image_checksum):
341          m.locked = True
342          m.test_run = threading.current_thread()
343          return m
344      for m in [machine for machine in self.GetAvailableMachines(label)
345                if not machine.locked]:
346        if not m.checksum:
347          m.locked = True
348          m.test_run = threading.current_thread()
349          return m
350      # This logic ensures that threads waiting on a machine will get a machine
351      # with a checksum equal to their image over other threads. This saves time
352      # when crosperf initially assigns the machines to threads by minimizing
353      # the number of re-images.
354      # TODO(asharif): If we centralize the thread-scheduler, we wont need this
355      # code and can implement minimal reimaging code more cleanly.
356      for m in [machine for machine in self.GetAvailableMachines(label)
357                if not machine.locked]:
358        if time.time() - m.released_time > 20:
359          m.locked = True
360          m.test_run = threading.current_thread()
361          return m
362    return None
363
364  def GetAvailableMachines(self, label=None):
365    if not label:
366      return self._machines
367    return [m for m in self._machines if m.name in label.remote]
368
369  def GetMachines(self, label=None):
370    if not label:
371      return self._all_machines
372    return [m for m in self._all_machines if m.name in label.remote]
373
374  def ReleaseMachine(self, machine):
375    with self._lock:
376      for m in self._machines:
377        if machine.name == m.name:
378          assert m.locked == True, "Tried to double-release %s" % m.name
379          m.released_time = time.time()
380          m.locked = False
381          m.status = "Available"
382          break
383
384  def Cleanup(self):
385    with self._lock:
386      # Unlock all machines.
387      for m in self._machines:
388        if not self.no_lock:
389          res = lock_machine.Machine(m.name).Unlock(True)
390          if not res:
391            logger.GetLogger().LogError("Could not unlock machine: '%s'."
392                                        % m.name)
393
394  def __str__(self):
395    with self._lock:
396      l = ["MachineManager Status:"]
397      for m in self._machines:
398        l.append(str(m))
399      return "\n".join(l)
400
401  def AsString(self):
402    with self._lock:
403      stringify_fmt = "%-30s %-10s %-4s %-25s %-32s"
404      header = stringify_fmt % ("Machine", "Thread", "Lock", "Status",
405                                "Checksum")
406      table = [header]
407      for m in self._machines:
408        if m.test_run:
409          test_name = m.test_run.name
410          test_status = m.test_run.timeline.GetLastEvent()
411        else:
412          test_name = ""
413          test_status = ""
414
415        try:
416          machine_string = stringify_fmt % (m.name,
417                                            test_name,
418                                            m.locked,
419                                            test_status,
420                                            m.checksum)
421        except Exception:
422          machine_string = ""
423        table.append(machine_string)
424      return "Machine Status:\n%s" % "\n".join(table)
425
426  def GetAllCPUInfo(self, labels):
427    """Get cpuinfo for labels, merge them if their cpuinfo are the same."""
428    dic = {}
429    for label in labels:
430      for machine in self._all_machines:
431        if machine.name in label.remote:
432          if machine.cpuinfo not in dic:
433            dic[machine.cpuinfo] = [label.name]
434          else:
435            dic[machine.cpuinfo].append(label.name)
436          break
437    output = ""
438    for key, v in dic.items():
439      output += " ".join(v)
440      output += "\n-------------------\n"
441      output += key
442      output += "\n\n\n"
443    return output
444
445
446class MockCrosMachine(CrosMachine):
447  def __init__(self, name, chromeos_root, log_level):
448    self.name = name
449    self.image = None
450    self.checksum = None
451    self.locked = False
452    self.released_time = time.time()
453    self.test_run = None
454    self.chromeos_root = chromeos_root
455    self.checksum_string = re.sub("\d", "", name)
456    #In test, we assume "lumpy1", "lumpy2" are the same machine.
457    self.machine_checksum =  self._GetMD5Checksum(self.checksum_string)
458    self.log_level = log_level
459
460  def IsReachable(self):
461    return True
462
463
464class MockMachineManager(MachineManager):
465
466  def __init__(self, chromeos_root, acquire_timeout, log_level):
467    super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout,
468                                             log_level)
469
470  def _TryToLockMachine(self, cros_machine):
471    self._machines.append(cros_machine)
472    cros_machine.checksum = ""
473
474  def AddMachine(self, machine_name):
475    with self._lock:
476      for m in self._all_machines:
477        assert m.name != machine_name, "Tried to double-add %s" % machine_name
478      cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level)
479      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
480                                   machine_name)
481      self._all_machines.append(cm)
482
483  def AcquireMachine(self, chromeos_image, label):
484    for machine in self._all_machines:
485      if not machine.locked:
486        machine.locked = True
487        return machine
488    return None
489
490  def ImageMachine(self, machine_name, label):
491    return 0
492
493  def ReleaseMachine(self, machine):
494    machine.locked = False
495
496  def GetMachines(self, label):
497    return self._all_machines
498
499  def GetAvailableMachines(self, label):
500    return self._all_machines
501