machine_manager.py revision 4467f004e7f0854963bec90daff1879fbd9d2fec
1#!/usr/bin/python
2#
3# Copyright 2012 Google Inc. All Rights Reserved.
4
5import hashlib
6import image_chromeos
7import lock_machine
8import math
9import os.path
10import re
11import sys
12import threading
13import time
14
15from utils import command_executer
16from utils import logger
17from utils.file_utils import FileUtils
18
19from image_checksummer import ImageChecksummer
20
21CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
22
23
24class CrosMachine(object):
25  def __init__(self, name, chromeos_root):
26    self.name = name
27    self.image = None
28    self.checksum = None
29    self.locked = False
30    self.released_time = time.time()
31    self.autotest_run = None
32    self.chromeos_root = chromeos_root
33    self._GetMemoryInfo()
34    self._GetCPUInfo()
35    self._ComputeMachineChecksumString()
36    self._GetMachineID()
37    self.machine_checksum = self._GetMD5Checksum(self.checksum_string)
38    self.machine_id_checksum = self._GetMD5Checksum(self.machine_id)
39
40  def _ParseMemoryInfo(self):
41    line = self.meminfo.splitlines()[0]
42    usable_kbytes = int(line.split()[1])
43    # This code is from src/third_party/autotest/files/client/bin/base_utils.py
44    # usable_kbytes is system's usable DRAM in kbytes,
45    #   as reported by memtotal() from device /proc/meminfo memtotal
46    #   after Linux deducts 1.5% to 9.5% for system table overhead
47    # Undo the unknown actual deduction by rounding up
48    #   to next small multiple of a big power-of-two
49    #   eg  12GB - 5.1% gets rounded back up to 12GB
50    mindeduct = 0.005  # 0.5 percent
51    maxdeduct = 0.095  # 9.5 percent
52    # deduction range 1.5% .. 9.5% supports physical mem sizes
53    #    6GB .. 12GB in steps of .5GB
54    #   12GB .. 24GB in steps of 1 GB
55    #   24GB .. 48GB in steps of 2 GB ...
56    # Finer granularity in physical mem sizes would require
57    #   tighter spread between min and max possible deductions
58
59    # increase mem size by at least min deduction, without rounding
60    min_kbytes = int(usable_kbytes / (1.0 - mindeduct))
61    # increase mem size further by 2**n rounding, by 0..roundKb or more
62    round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes
63    # find least binary roundup 2**n that covers worst-cast roundKb
64    mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2)))
65    # have round_kbytes <= mod2n < round_kbytes*2
66    # round min_kbytes up to next multiple of mod2n
67    phys_kbytes = min_kbytes + mod2n - 1
68    phys_kbytes -= phys_kbytes % mod2n  # clear low bits
69    self.phys_kbytes = phys_kbytes
70
71  def _GetMemoryInfo(self):
72    #TODO yunlian: when the machine in rebooting, it will not return
73    #meminfo, the assert does not catch it either
74    ce = command_executer.GetCommandExecuter()
75    command = "cat /proc/meminfo"
76    ret, self.meminfo, _ = ce.CrosRunCommand(
77        command, return_output=True,
78        machine=self.name, username="root", chromeos_root=self.chromeos_root)
79    assert ret == 0, "Could not get meminfo from machine: %s" % self.name
80    if ret == 0:
81      self._ParseMemoryInfo()
82
83  #cpuinfo format is different across architecture
84  #need to find a better way to parse it.
85  def _ParseCPUInfo(self,cpuinfo):
86    return 0
87
88  def _GetCPUInfo(self):
89    ce = command_executer.GetCommandExecuter()
90    command = "cat /proc/cpuinfo"
91    ret, self.cpuinfo, _ = ce.CrosRunCommand(
92        command, return_output=True,
93        machine=self.name, username="root", chromeos_root=self.chromeos_root)
94    assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name
95    if ret == 0:
96      self._ParseCPUInfo(self.cpuinfo)
97
98  def _ComputeMachineChecksumString(self):
99    self.checksum_string = ""
100    exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"]
101    for line in self.cpuinfo.splitlines():
102      if not any([e in line for e in exclude_lines_list]):
103        self.checksum_string += line
104    self.checksum_string += " " + str(self.phys_kbytes)
105
106  def _GetMD5Checksum(self, ss):
107    if ss:
108      return hashlib.md5(ss).hexdigest()
109    else:
110      return ""
111
112  def _GetMachineID(self):
113    ce = command_executer.GetCommandExecuter()
114    command = "ifconfig"
115    ret, if_out, _ = ce.CrosRunCommand(
116        command, return_output=True,
117        machine=self.name, chromeos_root=self.chromeos_root)
118    b = if_out.splitlines()
119    a = [l for l in b if "lan" in l]
120    self.machine_id = a[0]
121    assert ret == 0, "Could not get machine_id from machine: %s" % self.name
122
123  def __str__(self):
124    l = []
125    l.append(self.name)
126    l.append(str(self.image))
127    l.append(str(self.checksum))
128    l.append(str(self.locked))
129    l.append(str(self.released_time))
130    return ", ".join(l)
131
132
133class MachineManager(object):
134  def __init__(self, chromeos_root):
135    self._lock = threading.RLock()
136    self._all_machines = []
137    self._machines = []
138    self.image_lock = threading.Lock()
139    self.num_reimages = 0
140    self.chromeos_root = None
141    self.machine_checksum = {}
142    self.machine_checksum_string = {}
143
144    if os.path.isdir(lock_machine.Machine.LOCKS_DIR):
145      self.no_lock = False
146    else:
147      self.no_lock = True
148    self._initialized_machines = []
149    self.chromeos_root = chromeos_root
150
151  def ImageMachine(self, machine, label):
152    checksum = ImageChecksummer().Checksum(label.chromeos_image)
153    if machine.checksum == checksum:
154      return
155    chromeos_root = label.chromeos_root
156    if not chromeos_root:
157      chromeos_root = self.chromeos_root
158    image_chromeos_args = [image_chromeos.__file__,
159                           "--chromeos_root=%s" % chromeos_root,
160                           "--image=%s" % label.chromeos_image,
161                           "--image_args=%s" % label.image_args,
162                           "--remote=%s" % machine.name]
163    if label.board:
164      image_chromeos_args.append("--board=%s" % label.board)
165
166    # Currently can't image two machines at once.
167    # So have to serialized on this lock.
168    ce = command_executer.GetCommandExecuter()
169    with self.image_lock:
170      retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args))
171      if retval:
172        raise Exception("Could not image machine: '%s'." % machine.name)
173      else:
174        self.num_reimages += 1
175      machine.checksum = checksum
176      machine.image = label.chromeos_image
177
178    return retval
179
180  def ComputeCommonCheckSum(self, label):
181    for machine in self.GetMachines(label):
182      if machine.machine_checksum:
183        self.machine_checksum[label.name] = machine.machine_checksum
184        break
185
186  def ComputeCommonCheckSumString(self, label):
187    for machine in self.GetMachines(label):
188      if machine.checksum_string:
189        self.machine_checksum_string[label.name] = machine.checksum_string
190        break
191
192  def _TryToLockMachine(self, cros_machine):
193    with self._lock:
194      assert cros_machine, "Machine can't be None"
195      for m in self._machines:
196        assert m.name != cros_machine.name, (
197            "Tried to double-lock %s" % cros_machine.name)
198      if self.no_lock:
199        locked = True
200      else:
201        locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0])
202      if locked:
203        self._machines.append(cros_machine)
204        ce = command_executer.GetCommandExecuter()
205        command = "cat %s" % CHECKSUM_FILE
206        ret, out, _ = ce.CrosRunCommand(
207            command, return_output=True, chromeos_root=self.chromeos_root,
208            machine=cros_machine.name)
209        if ret == 0:
210          cros_machine.checksum = out.strip()
211      else:
212        logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name)
213
214  # This is called from single threaded mode.
215  def AddMachine(self, machine_name):
216    with self._lock:
217      for m in self._all_machines:
218        assert m.name != machine_name, "Tried to double-add %s" % machine_name
219      cm = CrosMachine(machine_name, self.chromeos_root)
220      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
221                           machine_name)
222      self._all_machines.append(cm)
223
224  def AreAllMachineSame(self, label):
225    checksums = [m.machine_checksum for m in self.GetMachines(label)]
226    return len(set(checksums)) == 1
227
228  def AcquireMachine(self, chromeos_image, label):
229    image_checksum = ImageChecksummer().Checksum(chromeos_image)
230    machines = self.GetMachines(label)
231    with self._lock:
232      # Lazily external lock machines
233
234      for m in machines:
235        if m not in self._initialized_machines:
236          self._initialized_machines.append(m)
237          self._TryToLockMachine(m)
238          m.released_time = time.time()
239      if not self.AreAllMachineSame(label):
240        logger.GetLogger().LogFatal("-- not all the machine are identical")
241      if not self.GetAvailableMachines(label):
242        machine_names = []
243        for machine in machines:
244          machine_names.append(machine.name)
245        logger.GetLogger().LogFatal("Could not acquire any of the "
246                                  "following machines: '%s'"
247                                  % ", ".join(machine_names))
248
249###      for m in self._machines:
250###        if (m.locked and time.time() - m.released_time < 10 and
251###            m.checksum == image_checksum):
252###          return None
253      for m in [machine for machine in self.GetAvailableMachines(label)
254                if not machine.locked]:
255        if m.checksum == image_checksum:
256          m.locked = True
257          m.autotest_run = threading.current_thread()
258          return m
259      for m in [machine for machine in self.GetAvailableMachines(label)
260                if not machine.locked]:
261        if not m.checksum:
262          m.locked = True
263          m.autotest_run = threading.current_thread()
264          return m
265      # This logic ensures that threads waiting on a machine will get a machine
266      # with a checksum equal to their image over other threads. This saves time
267      # when crosperf initially assigns the machines to threads by minimizing
268      # the number of re-images.
269      # TODO(asharif): If we centralize the thread-scheduler, we wont need this
270      # code and can implement minimal reimaging code more cleanly.
271      for m in [machine for machine in self.GetAvailableMachines(label)
272                if not machine.locked]:
273        if time.time() - m.released_time > 20:
274          m.locked = True
275          m.autotest_run = threading.current_thread()
276          return m
277    return None
278
279  def GetAvailableMachines(self, label=None):
280    if not label:
281      return self._machines
282    return [m for m in self._machines if m.name in label.remote]
283
284  def GetMachines(self, label=None):
285    if not label:
286      return self._all_machines
287    return [m for m in self._all_machines if m.name in label.remote]
288
289  def ReleaseMachine(self, machine):
290    with self._lock:
291      for m in self._machines:
292        if machine.name == m.name:
293          assert m.locked == True, "Tried to double-release %s" % m.name
294          m.released_time = time.time()
295          m.locked = False
296          m.status = "Available"
297          break
298
299  def Cleanup(self):
300    with self._lock:
301      # Unlock all machines.
302      for m in self._machines:
303        if not self.no_lock:
304          res = lock_machine.Machine(m.name).Unlock(True)
305          if not res:
306            logger.GetLogger().LogError("Could not unlock machine: '%s'."
307                                        % m.name)
308
309  def __str__(self):
310    with self._lock:
311      l = ["MachineManager Status:"]
312      for m in self._machines:
313        l.append(str(m))
314      return "\n".join(l)
315
316  def AsString(self):
317    with self._lock:
318      stringify_fmt = "%-30s %-10s %-4s %-25s %-32s"
319      header = stringify_fmt % ("Machine", "Thread", "Lock", "Status",
320                                "Checksum")
321      table = [header]
322      for m in self._machines:
323        if m.autotest_run:
324          autotest_name = m.autotest_run.name
325          autotest_status = m.autotest_run.timeline.GetLastEvent()
326        else:
327          autotest_name = ""
328          autotest_status = ""
329
330        try:
331          machine_string = stringify_fmt % (m.name,
332                                            autotest_name,
333                                            m.locked,
334                                            autotest_status,
335                                            m.checksum)
336        except Exception:
337          machine_string = ""
338        table.append(machine_string)
339      return "Machine Status:\n%s" % "\n".join(table)
340
341  def GetAllCPUInfo(self, labels):
342    """Get cpuinfo for labels, merge them if their cpuinfo are the same."""
343    dic = {}
344    for label in labels:
345      for machine in self._all_machines:
346        if machine.name in label.remote:
347          if machine.cpuinfo not in dic:
348            dic[machine.cpuinfo] = [label.name]
349          else:
350            dic[machine.cpuinfo].append(label.name)
351          break
352    output = ""
353    for key, v in dic.items():
354      output += " ".join(v)
355      output += "\n-------------------\n"
356      output += key
357      output += "\n\n\n"
358    return output
359
360
361class MockCrosMachine(CrosMachine):
362  def __init__(self, name, chromeos_root):
363    self.name = name
364    self.image = None
365    self.checksum = None
366    self.locked = False
367    self.released_time = time.time()
368    self.autotest_run = None
369    self.chromeos_root = chromeos_root
370    self.checksum_string = re.sub("\d", "", name)
371    #In test, we assume "lumpy1", "lumpy2" are the same machine.
372    self.machine_checksum =  self._GetMD5Checksum(self.checksum_string)
373
374
375class MockMachineManager(MachineManager):
376
377  def __init__(self, chromeos_root):
378    super(MockMachineManager, self).__init__(chromeos_root)
379
380  def _TryToLockMachine(self, cros_machine):
381    self._machines.append(cros_machine)
382    cros_machine.checksum = ""
383
384  def AddMachine(self, machine_name):
385    with self._lock:
386      for m in self._all_machines:
387        assert m.name != machine_name, "Tried to double-add %s" % machine_name
388      cm = MockCrosMachine(machine_name, self.chromeos_root)
389      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
390                                   machine_name)
391      self._all_machines.append(cm)
392
393  def AcquireMachine(self, chromeos_image, label):
394    for machine in self._all_machines:
395      if not machine.locked:
396        machine.locked = True
397        return machine
398    return None
399
400  def ImageMachine(self, machine_name, label):
401    return 0
402
403  def ReleaseMachine(self, machine):
404    machine.locked = False
405
406  def GetMachines(self, label):
407    return self._all_machines
408
409  def GetAvailableMachines(self, label):
410    return self._all_machines
411