machine_manager.py revision 0cc4e7790afbd514675801a1ffb90517c147270f
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import hashlib
8import image_chromeos
9import lock_machine
10import math
11import os.path
12import re
13import sys
14import threading
15import time
16
17from utils import command_executer
18from utils import logger
19from utils.file_utils import FileUtils
20
21from image_checksummer import ImageChecksummer
22
23CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
24
25
26class CrosMachine(object):
27  def __init__(self, name, chromeos_root):
28    self.name = name
29    self.image = None
30    self.checksum = None
31    self.locked = False
32    self.released_time = time.time()
33    self.test_run = None
34    self.chromeos_root = chromeos_root
35    if not self.IsReachable():
36      self.machine_checksum = None
37      return
38    self._GetMemoryInfo()
39    self._GetCPUInfo()
40    self._ComputeMachineChecksumString()
41    self._GetMachineID()
42    self.machine_checksum = self._GetMD5Checksum(self.checksum_string)
43    self.machine_id_checksum = self._GetMD5Checksum(self.machine_id)
44
45  def IsReachable(self):
46    ce = command_executer.GetCommandExecuter()
47    command = "ls"
48    ret = ce.CrosRunCommand(command,
49                            machine=self.name,
50                            chromeos_root=self.chromeos_root)
51    if ret:
52      return False
53    return True
54
55  def _ParseMemoryInfo(self):
56    line = self.meminfo.splitlines()[0]
57    usable_kbytes = int(line.split()[1])
58    # This code is from src/third_party/test/files/client/bin/base_utils.py
59    # usable_kbytes is system's usable DRAM in kbytes,
60    #   as reported by memtotal() from device /proc/meminfo memtotal
61    #   after Linux deducts 1.5% to 9.5% for system table overhead
62    # Undo the unknown actual deduction by rounding up
63    #   to next small multiple of a big power-of-two
64    #   eg  12GB - 5.1% gets rounded back up to 12GB
65    mindeduct = 0.005  # 0.5 percent
66    maxdeduct = 0.095  # 9.5 percent
67    # deduction range 1.5% .. 9.5% supports physical mem sizes
68    #    6GB .. 12GB in steps of .5GB
69    #   12GB .. 24GB in steps of 1 GB
70    #   24GB .. 48GB in steps of 2 GB ...
71    # Finer granularity in physical mem sizes would require
72    #   tighter spread between min and max possible deductions
73
74    # increase mem size by at least min deduction, without rounding
75    min_kbytes = int(usable_kbytes / (1.0 - mindeduct))
76    # increase mem size further by 2**n rounding, by 0..roundKb or more
77    round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes
78    # find least binary roundup 2**n that covers worst-cast roundKb
79    mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2)))
80    # have round_kbytes <= mod2n < round_kbytes*2
81    # round min_kbytes up to next multiple of mod2n
82    phys_kbytes = min_kbytes + mod2n - 1
83    phys_kbytes -= phys_kbytes % mod2n  # clear low bits
84    self.phys_kbytes = phys_kbytes
85
86  def _GetMemoryInfo(self):
87    #TODO yunlian: when the machine in rebooting, it will not return
88    #meminfo, the assert does not catch it either
89    ce = command_executer.GetCommandExecuter()
90    command = "cat /proc/meminfo"
91    ret, self.meminfo, _ = ce.CrosRunCommand(
92        command, return_output=True,
93        machine=self.name, username="root", chromeos_root=self.chromeos_root)
94    assert ret == 0, "Could not get meminfo from machine: %s" % self.name
95    if ret == 0:
96      self._ParseMemoryInfo()
97
98  #cpuinfo format is different across architecture
99  #need to find a better way to parse it.
100  def _ParseCPUInfo(self,cpuinfo):
101    return 0
102
103  def _GetCPUInfo(self):
104    ce = command_executer.GetCommandExecuter()
105    command = "cat /proc/cpuinfo"
106    ret, self.cpuinfo, _ = ce.CrosRunCommand(
107        command, return_output=True,
108        machine=self.name, username="root", chromeos_root=self.chromeos_root)
109    assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name
110    if ret == 0:
111      self._ParseCPUInfo(self.cpuinfo)
112
113  def _ComputeMachineChecksumString(self):
114    self.checksum_string = ""
115    exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"]
116    for line in self.cpuinfo.splitlines():
117      if not any([e in line for e in exclude_lines_list]):
118        self.checksum_string += line
119    self.checksum_string += " " + str(self.phys_kbytes)
120
121  def _GetMD5Checksum(self, ss):
122    if ss:
123      return hashlib.md5(ss).hexdigest()
124    else:
125      return ""
126
127  def _GetMachineID(self):
128    ce = command_executer.GetCommandExecuter()
129    command = "dump_vpd_log --full --stdout"
130    ret, if_out, _ = ce.CrosRunCommand(
131        command, return_output=True,
132        machine=self.name, chromeos_root=self.chromeos_root)
133    b = if_out.splitlines()
134    a = [l for l in b if "Product" in l]
135    if len(a):
136      self.machine_id = a[0]
137      return
138    command = "ifconfig"
139    ret, if_out, _ = ce.CrosRunCommand(
140        command, return_output=True,
141        machine=self.name, chromeos_root=self.chromeos_root)
142    b = if_out.splitlines()
143    a = [l for l in b if "HWaddr" in l]
144    if len(a):
145      self.machine_id = "_".join(a)
146      return
147    a = [l for l in b if "ether" in l]
148    if len(a):
149      self.machine_id = "_".join(a)
150      return
151    assert 0, "Could not get machine_id from machine: %s" % self.name
152
153  def __str__(self):
154    l = []
155    l.append(self.name)
156    l.append(str(self.image))
157    l.append(str(self.checksum))
158    l.append(str(self.locked))
159    l.append(str(self.released_time))
160    return ", ".join(l)
161
162
163class MachineManager(object):
164  def __init__(self, chromeos_root, acquire_timeout):
165    self._lock = threading.RLock()
166    self._all_machines = []
167    self._machines = []
168    self.image_lock = threading.Lock()
169    self.num_reimages = 0
170    self.chromeos_root = None
171    self.machine_checksum = {}
172    self.machine_checksum_string = {}
173    self.acquire_timeout = acquire_timeout
174
175    if os.path.isdir(lock_machine.Machine.LOCKS_DIR):
176      self.no_lock = False
177    else:
178      self.no_lock = True
179    self._initialized_machines = []
180    self.chromeos_root = chromeos_root
181
182  def ImageMachine(self, machine, label):
183    if label.image_type == "local":
184      checksum = ImageChecksummer().Checksum(label)
185    elif label.image_type == "trybot":
186      checksum = machine._GetMD5Checksum(label.chromeos_image)
187    else:
188      checksum = None
189
190    if checksum and (machine.checksum == checksum):
191      return
192    chromeos_root = label.chromeos_root
193    if not chromeos_root:
194      chromeos_root = self.chromeos_root
195    image_chromeos_args = [image_chromeos.__file__,
196                           "--chromeos_root=%s" % chromeos_root,
197                           "--image=%s" % label.chromeos_image,
198                           "--image_args=%s" % label.image_args,
199                           "--remote=%s" % machine.name]
200    if label.board:
201      image_chromeos_args.append("--board=%s" % label.board)
202
203    # Currently can't image two machines at once.
204    # So have to serialized on this lock.
205    ce = command_executer.GetCommandExecuter()
206    with self.image_lock:
207      retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args))
208      if retval:
209        cmd ="reboot && exit"
210        ce.CrosRunCommand(cmd, machine=machine.name,
211                          chromeos_root=self.chromeos_root)
212        time.sleep(60)
213        retval = ce.RunCommand(" ".join(["python"] + image_chromeos_args))
214      if retval:
215        raise Exception("Could not image machine: '%s'." % machine.name)
216      else:
217        self.num_reimages += 1
218      machine.checksum = checksum
219      machine.image = label.chromeos_image
220
221    return retval
222
223  def ComputeCommonCheckSum(self, label):
224    for machine in self.GetMachines(label):
225      if machine.machine_checksum:
226        self.machine_checksum[label.name] = machine.machine_checksum
227        break
228
229  def ComputeCommonCheckSumString(self, label):
230    for machine in self.GetMachines(label):
231      if machine.checksum_string:
232        self.machine_checksum_string[label.name] = machine.checksum_string
233        break
234
235  def _TryToLockMachine(self, cros_machine):
236    with self._lock:
237      assert cros_machine, "Machine can't be None"
238      for m in self._machines:
239        if m.name == cros_machine.name:
240          return
241      if self.no_lock:
242        locked = True
243      else:
244        locked = lock_machine.Machine(cros_machine.name).Lock(True, sys.argv[0])
245      if locked:
246        self._machines.append(cros_machine)
247        ce = command_executer.GetCommandExecuter()
248        command = "cat %s" % CHECKSUM_FILE
249        ret, out, _ = ce.CrosRunCommand(
250            command, return_output=True, chromeos_root=self.chromeos_root,
251            machine=cros_machine.name)
252        if ret == 0:
253          cros_machine.checksum = out.strip()
254      else:
255        logger.GetLogger().LogOutput("Couldn't lock: %s" % cros_machine.name)
256
257  # This is called from single threaded mode.
258  def AddMachine(self, machine_name):
259    with self._lock:
260      for m in self._all_machines:
261        assert m.name != machine_name, "Tried to double-add %s" % machine_name
262      cm = CrosMachine(machine_name, self.chromeos_root)
263      if cm.machine_checksum:
264        self._all_machines.append(cm)
265
266  def AreAllMachineSame(self, label):
267    checksums = [m.machine_checksum for m in self.GetMachines(label)]
268    return len(set(checksums)) == 1
269
270  def RemoveMachine(self, machine_name):
271    with self._lock:
272      self._machines = [m for m in self._machines
273                        if m.name != machine_name]
274      res = lock_machine.Machine(machine_name).Unlock(True)
275      if not res:
276        logger.GetLogger().LogError("Could not unlock machine: '%s'."
277                                    % m.name)
278
279  def AcquireMachine(self, chromeos_image, label):
280    if label.image_type == "local":
281      image_checksum = ImageChecksummer().Checksum(label)
282    elif label.image_type == "trybot":
283      image_checksum = hashlib.md5(chromeos_image).hexdigest()
284    else:
285      image_checksum = None
286    machines = self.GetMachines(label)
287    check_interval_time = 120
288    with self._lock:
289      # Lazily external lock machines
290      while self.acquire_timeout >= 0:
291        for m in machines:
292          new_machine = m not in self._all_machines
293          self._TryToLockMachine(m)
294          if new_machine:
295            m.released_time = time.time()
296        if not self.AreAllMachineSame(label):
297          logger.GetLogger().LogFatal("-- not all the machine are identical")
298        if self.GetAvailableMachines(label):
299          break
300        else:
301          sleep_time = max(1, min(self.acquire_timeout, check_interval_time))
302          time.sleep(sleep_time)
303          self.acquire_timeout -= sleep_time
304
305      if self.acquire_timeout < 0:
306        machine_names = []
307        for machine in machines:
308          machine_names.append(machine.name)
309        logger.GetLogger().LogFatal("Could not acquire any of the "
310                                    "following machines: '%s'"
311                                    % ", ".join(machine_names))
312
313###      for m in self._machines:
314###        if (m.locked and time.time() - m.released_time < 10 and
315###            m.checksum == image_checksum):
316###          return None
317      for m in [machine for machine in self.GetAvailableMachines(label)
318                if not machine.locked]:
319        if image_checksum and (m.checksum == image_checksum):
320          m.locked = True
321          m.test_run = threading.current_thread()
322          return m
323      for m in [machine for machine in self.GetAvailableMachines(label)
324                if not machine.locked]:
325        if not m.checksum:
326          m.locked = True
327          m.test_run = threading.current_thread()
328          return m
329      # This logic ensures that threads waiting on a machine will get a machine
330      # with a checksum equal to their image over other threads. This saves time
331      # when crosperf initially assigns the machines to threads by minimizing
332      # the number of re-images.
333      # TODO(asharif): If we centralize the thread-scheduler, we wont need this
334      # code and can implement minimal reimaging code more cleanly.
335      for m in [machine for machine in self.GetAvailableMachines(label)
336                if not machine.locked]:
337        if time.time() - m.released_time > 20:
338          m.locked = True
339          m.test_run = threading.current_thread()
340          return m
341    return None
342
343  def GetAvailableMachines(self, label=None):
344    if not label:
345      return self._machines
346    return [m for m in self._machines if m.name in label.remote]
347
348  def GetMachines(self, label=None):
349    if not label:
350      return self._all_machines
351    return [m for m in self._all_machines if m.name in label.remote]
352
353  def ReleaseMachine(self, machine):
354    with self._lock:
355      for m in self._machines:
356        if machine.name == m.name:
357          assert m.locked == True, "Tried to double-release %s" % m.name
358          m.released_time = time.time()
359          m.locked = False
360          m.status = "Available"
361          break
362
363  def Cleanup(self):
364    with self._lock:
365      # Unlock all machines.
366      for m in self._machines:
367        if not self.no_lock:
368          res = lock_machine.Machine(m.name).Unlock(True)
369          if not res:
370            logger.GetLogger().LogError("Could not unlock machine: '%s'."
371                                        % m.name)
372
373  def __str__(self):
374    with self._lock:
375      l = ["MachineManager Status:"]
376      for m in self._machines:
377        l.append(str(m))
378      return "\n".join(l)
379
380  def AsString(self):
381    with self._lock:
382      stringify_fmt = "%-30s %-10s %-4s %-25s %-32s"
383      header = stringify_fmt % ("Machine", "Thread", "Lock", "Status",
384                                "Checksum")
385      table = [header]
386      for m in self._machines:
387        if m.test_run:
388          test_name = m.test_run.name
389          test_status = m.test_run.timeline.GetLastEvent()
390        else:
391          test_name = ""
392          test_status = ""
393
394        try:
395          machine_string = stringify_fmt % (m.name,
396                                            test_name,
397                                            m.locked,
398                                            test_status,
399                                            m.checksum)
400        except Exception:
401          machine_string = ""
402        table.append(machine_string)
403      return "Machine Status:\n%s" % "\n".join(table)
404
405  def GetAllCPUInfo(self, labels):
406    """Get cpuinfo for labels, merge them if their cpuinfo are the same."""
407    dic = {}
408    for label in labels:
409      for machine in self._all_machines:
410        if machine.name in label.remote:
411          if machine.cpuinfo not in dic:
412            dic[machine.cpuinfo] = [label.name]
413          else:
414            dic[machine.cpuinfo].append(label.name)
415          break
416    output = ""
417    for key, v in dic.items():
418      output += " ".join(v)
419      output += "\n-------------------\n"
420      output += key
421      output += "\n\n\n"
422    return output
423
424
425class MockCrosMachine(CrosMachine):
426  def __init__(self, name, chromeos_root):
427    self.name = name
428    self.image = None
429    self.checksum = None
430    self.locked = False
431    self.released_time = time.time()
432    self.test_run = None
433    self.chromeos_root = chromeos_root
434    self.checksum_string = re.sub("\d", "", name)
435    #In test, we assume "lumpy1", "lumpy2" are the same machine.
436    self.machine_checksum =  self._GetMD5Checksum(self.checksum_string)
437
438
439class MockMachineManager(MachineManager):
440
441  def __init__(self, chromeos_root, acquire_timeout):
442    super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout)
443
444  def _TryToLockMachine(self, cros_machine):
445    self._machines.append(cros_machine)
446    cros_machine.checksum = ""
447
448  def AddMachine(self, machine_name):
449    with self._lock:
450      for m in self._all_machines:
451        assert m.name != machine_name, "Tried to double-add %s" % machine_name
452      cm = MockCrosMachine(machine_name, self.chromeos_root)
453      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
454                                   machine_name)
455      self._all_machines.append(cm)
456
457  def AcquireMachine(self, chromeos_image, label):
458    for machine in self._all_machines:
459      if not machine.locked:
460        machine.locked = True
461        return machine
462    return None
463
464  def ImageMachine(self, machine_name, label):
465    return 0
466
467  def ReleaseMachine(self, machine):
468    machine.locked = False
469
470  def GetMachines(self, label):
471    return self._all_machines
472
473  def GetAvailableMachines(self, label):
474    return self._all_machines
475