machine_manager.py revision 5c09fc2966ac49263ce7154c2905f2a86aeda297
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import hashlib
8import image_chromeos
9import lock_machine
10import math
11import os.path
12import re
13import sys
14import threading
15import time
16
17
18from utils import command_executer
19from utils import logger
20from utils import misc
21from utils.file_utils import FileUtils
22
23from image_checksummer import ImageChecksummer
24
25CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
26
27class NonMatchingMachines(Exception):
28  pass
29
30class MissingLocksDirectory(Exception):
31    """Raised when cannot find/access the machine locks directory."""
32
33class CrosMachine(object):
34  def __init__(self, name, chromeos_root, log_level, cmd_exec=None):
35    self.name = name
36    self.image = None
37    self.checksum = None
38    self.locked = False
39    self.released_time = time.time()
40    self.test_run = None
41    self.chromeos_root = chromeos_root
42    self.log_level = log_level
43    self.ce = cmd_exec or command_executer.GetCommandExecuter(
44        log_level=self.log_level)
45    self.SetUpChecksumInfo()
46
47  def SetUpChecksumInfo(self):
48    if not self.IsReachable():
49      self.machine_checksum = None
50      return
51    self._GetMemoryInfo()
52    self._GetCPUInfo()
53    self._ComputeMachineChecksumString()
54    self._GetMachineID()
55    self.machine_checksum = self._GetMD5Checksum(self.checksum_string)
56    self.machine_id_checksum = self._GetMD5Checksum(self.machine_id)
57
58  def IsReachable(self):
59    command = "ls"
60    ret = self.ce.CrosRunCommand(command,
61                                 machine=self.name,
62                                 chromeos_root=self.chromeos_root)
63    if ret:
64      return False
65    return True
66
67  def _ParseMemoryInfo(self):
68    line = self.meminfo.splitlines()[0]
69    usable_kbytes = int(line.split()[1])
70    # This code is from src/third_party/test/files/client/bin/base_utils.py
71    # usable_kbytes is system's usable DRAM in kbytes,
72    #   as reported by memtotal() from device /proc/meminfo memtotal
73    #   after Linux deducts 1.5% to 9.5% for system table overhead
74    # Undo the unknown actual deduction by rounding up
75    #   to next small multiple of a big power-of-two
76    #   eg  12GB - 5.1% gets rounded back up to 12GB
77    mindeduct = 0.005  # 0.5 percent
78    maxdeduct = 0.095  # 9.5 percent
79    # deduction range 1.5% .. 9.5% supports physical mem sizes
80    #    6GB .. 12GB in steps of .5GB
81    #   12GB .. 24GB in steps of 1 GB
82    #   24GB .. 48GB in steps of 2 GB ...
83    # Finer granularity in physical mem sizes would require
84    #   tighter spread between min and max possible deductions
85
86    # increase mem size by at least min deduction, without rounding
87    min_kbytes = int(usable_kbytes / (1.0 - mindeduct))
88    # increase mem size further by 2**n rounding, by 0..roundKb or more
89    round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes
90    # find least binary roundup 2**n that covers worst-cast roundKb
91    mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2)))
92    # have round_kbytes <= mod2n < round_kbytes*2
93    # round min_kbytes up to next multiple of mod2n
94    phys_kbytes = min_kbytes + mod2n - 1
95    phys_kbytes -= phys_kbytes % mod2n  # clear low bits
96    self.phys_kbytes = phys_kbytes
97
98  def _GetMemoryInfo(self):
99    #TODO yunlian: when the machine in rebooting, it will not return
100    #meminfo, the assert does not catch it either
101    command = "cat /proc/meminfo"
102    ret, self.meminfo, _ = self.ce.CrosRunCommand(command, return_output=True,
103                                              machine=self.name,
104                                              username="root",
105                                              chromeos_root=self.chromeos_root)
106    assert ret == 0, "Could not get meminfo from machine: %s" % self.name
107    if ret == 0:
108      self._ParseMemoryInfo()
109
110  #cpuinfo format is different across architecture
111  #need to find a better way to parse it.
112  def _ParseCPUInfo(self, cpuinfo):
113    return 0
114
115  def _GetCPUInfo(self):
116    command = "cat /proc/cpuinfo"
117    ret, self.cpuinfo, _ = self.ce.CrosRunCommand(command, return_output=True,
118                                              machine=self.name,
119                                              username="root",
120                                              chromeos_root=self.chromeos_root)
121    assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name
122    if ret == 0:
123      self._ParseCPUInfo(self.cpuinfo)
124
125  def _ComputeMachineChecksumString(self):
126    self.checksum_string = ""
127    exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"]
128    for line in self.cpuinfo.splitlines():
129      if not any([e in line for e in exclude_lines_list]):
130        self.checksum_string += line
131    self.checksum_string += " " + str(self.phys_kbytes)
132
133  def _GetMD5Checksum(self, ss):
134    if ss:
135      return hashlib.md5(ss).hexdigest()
136    else:
137      return ""
138
139  def _GetMachineID(self):
140    command = "dump_vpd_log --full --stdout"
141    ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True,
142                                            machine=self.name,
143                                            chromeos_root=self.chromeos_root)
144    b = if_out.splitlines()
145    a = [l for l in b if "Product" in l]
146    if len(a):
147      self.machine_id = a[0]
148      return
149    command = "ifconfig"
150    ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True,
151                                            machine=self.name,
152                                            chromeos_root=self.chromeos_root)
153    b = if_out.splitlines()
154    a = [l for l in b if "HWaddr" in l]
155    if len(a):
156      self.machine_id = "_".join(a)
157      return
158    a = [l for l in b if "ether" in l]
159    if len(a):
160      self.machine_id = "_".join(a)
161      return
162    assert 0, "Could not get machine_id from machine: %s" % self.name
163
164  def __str__(self):
165    l = []
166    l.append(self.name)
167    l.append(str(self.image))
168    l.append(str(self.checksum))
169    l.append(str(self.locked))
170    l.append(str(self.released_time))
171    return ", ".join(l)
172
173
174class MachineManager(object):
175  def __init__(self, chromeos_root, acquire_timeout, log_level, locks_dir,
176               cmd_exec=None, lgr=None):
177    self._lock = threading.RLock()
178    self._all_machines = []
179    self._machines = []
180    self.image_lock = threading.Lock()
181    self.num_reimages = 0
182    self.chromeos_root = None
183    self.machine_checksum = {}
184    self.machine_checksum_string = {}
185    self.acquire_timeout = acquire_timeout
186    self.log_level = log_level
187    self.locks_dir = locks_dir
188    self.ce = cmd_exec or command_executer.GetCommandExecuter(
189        log_level=self.log_level)
190    self.logger = lgr or logger.GetLogger()
191
192    if self.locks_dir != lock_machine.Machine.LOCKS_DIR:
193      msg = ("WARNING:  If you use your own locks directory, there is no"
194             " guarantee that someone else might not hold a lock on the same"
195             " machine in a different locks directory.")
196      self.logger.LogOutput(msg)
197
198    if not os.path.isdir(self.locks_dir):
199      raise MissingLocksDirectory ("Cannot access locks directory: %s"
200                                   % self.locks_dir)
201    self._initialized_machines = []
202    self.chromeos_root = chromeos_root
203
204  def ImageMachine(self, machine, label):
205    if label.image_type == "local":
206      checksum = ImageChecksummer().Checksum(label, self.log_level)
207    elif label.image_type == "trybot":
208      checksum = machine._GetMD5Checksum(label.chromeos_image)
209    else:
210      checksum = None
211
212    if checksum and (machine.checksum == checksum):
213      return
214    chromeos_root = label.chromeos_root
215    if not chromeos_root:
216      chromeos_root = self.chromeos_root
217    image_chromeos_args = [image_chromeos.__file__,
218                           "--chromeos_root=%s" % chromeos_root,
219                           "--image=%s" % label.chromeos_image,
220                           "--image_args=%s" % label.image_args,
221                           "--remote=%s" % machine.name,
222                           "--logging_level=%s" % self.log_level]
223    if label.board:
224      image_chromeos_args.append("--board=%s" % label.board)
225
226    # Currently can't image two machines at once.
227    # So have to serialized on this lock.
228    save_ce_log_level = self.ce.log_level
229    if self.log_level != "verbose":
230      self.ce.log_level = "average"
231
232    with self.image_lock:
233      if self.log_level != "verbose":
234        self.logger.LogOutput("Pushing image onto machine.")
235        self.logger.LogOutput("CMD : python %s "
236                                 % " ".join(image_chromeos_args))
237      retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args))
238      if retval:
239        cmd = "reboot && exit"
240        if self.log_level != "verbose":
241          self.logger.LogOutput("reboot & exit.")
242        self.ce.CrosRunCommand(cmd, machine=machine.name,
243                               chromeos_root=self.chromeos_root)
244        time.sleep(60)
245        if self.log_level != "verbose":
246          self.logger.LogOutput("Pushing image onto machine.")
247          self.logger.LogOutput("CMD : python %s "
248                                     % " ".join(image_chromeos_args))
249        retval = self.ce.RunCommand(" ".join(["python"] + image_chromeos_args))
250      if retval:
251        raise Exception("Could not image machine: '%s'." % machine.name)
252      else:
253        self.num_reimages += 1
254      machine.checksum = checksum
255      machine.image = label.chromeos_image
256
257    self.ce.log_level = save_ce_log_level
258    return retval
259
260  def ComputeCommonCheckSum(self, label):
261    for machine in self.GetMachines(label):
262      if machine.machine_checksum:
263        self.machine_checksum[label.name] = machine.machine_checksum
264        break
265
266  def ComputeCommonCheckSumString(self, label):
267    for machine in self.GetMachines(label):
268      if machine.checksum_string:
269        self.machine_checksum_string[label.name] = machine.checksum_string
270        break
271
272  def _TryToLockMachine(self, cros_machine):
273    with self._lock:
274      assert cros_machine, "Machine can't be None"
275      for m in self._machines:
276        if m.name == cros_machine.name:
277          return
278      locked = lock_machine.Machine(cros_machine.name,
279                                    self.locks_dir).Lock(True,
280                                                         sys.argv[0])
281      if locked:
282        self._machines.append(cros_machine)
283        command = "cat %s" % CHECKSUM_FILE
284        ret, out, _ = self.ce.CrosRunCommand(
285            command, return_output=True, chromeos_root=self.chromeos_root,
286            machine=cros_machine.name)
287        if ret == 0:
288          cros_machine.checksum = out.strip()
289      else:
290        self.logger.LogOutput("Couldn't lock: %s" % cros_machine.name)
291
292  # This is called from single threaded mode.
293  def AddMachine(self, machine_name):
294    with self._lock:
295      for m in self._all_machines:
296        assert m.name != machine_name, "Tried to double-add %s" % machine_name
297        if self.log_level != "verbose":
298          self.logger.LogOutput("Setting up remote access to %s"
299                              % machine_name)
300          self.logger.LogOutput("Checking machine characteristics for %s"
301                              % machine_name)
302      cm = CrosMachine(machine_name, self.chromeos_root, self.log_level)
303      if cm.machine_checksum:
304        self._all_machines.append(cm)
305
306
307  def AreAllMachineSame(self, label):
308    checksums = [m.machine_checksum for m in self.GetMachines(label)]
309    return len(set(checksums)) == 1
310
311
312  def RemoveMachine(self, machine_name):
313    with self._lock:
314      self._machines = [m for m in self._machines
315                        if m.name != machine_name]
316      res = lock_machine.Machine(machine_name,
317                                 self.locks_dir).Unlock(True)
318
319      if not res:
320        self.logger.LogError("Could not unlock machine: '%s'."
321                             % m.name)
322
323  def ForceSameImageToAllMachines(self, label):
324    machines = self.GetMachines(label)
325    for m in machines:
326      self.ImageMachine(m, label)
327      m.SetUpChecksumInfo()
328
329  def AcquireMachine(self, chromeos_image, label, throw=False):
330    if label.image_type == "local":
331      image_checksum = ImageChecksummer().Checksum(label, self.log_level)
332    elif label.image_type == "trybot":
333      image_checksum = hashlib.md5(chromeos_image).hexdigest()
334    else:
335      image_checksum = None
336    machines = self.GetMachines(label)
337    check_interval_time = 120
338    with self._lock:
339      # Lazily external lock machines
340      while self.acquire_timeout >= 0:
341        for m in machines:
342          new_machine = m not in self._all_machines
343          self._TryToLockMachine(m)
344          if new_machine:
345            m.released_time = time.time()
346        if not self.AreAllMachineSame(label):
347          if not throw:
348            # Log fatal message, which calls sys.exit.  Default behavior.
349            self.logger.LogFatal("-- not all the machines are identical")
350          else:
351            # Raise an exception, which can be caught and handled by calling
352            # function.
353            raise NonMatchingMachines("Not all the machines are identical")
354        if self.GetAvailableMachines(label):
355          break
356        else:
357          sleep_time = max(1, min(self.acquire_timeout, check_interval_time))
358          time.sleep(sleep_time)
359          self.acquire_timeout -= sleep_time
360
361      if self.acquire_timeout < 0:
362        machine_names = []
363        for machine in machines:
364          machine_names.append(machine.name)
365        self.logger.LogFatal("Could not acquire any of the "
366                             "following machines: '%s'"
367                             % ", ".join(machine_names))
368
369###      for m in self._machines:
370###        if (m.locked and time.time() - m.released_time < 10 and
371###            m.checksum == image_checksum):
372###          return None
373      for m in [machine for machine in self.GetAvailableMachines(label)
374                if not machine.locked]:
375        if image_checksum and (m.checksum == image_checksum):
376          m.locked = True
377          m.test_run = threading.current_thread()
378          return m
379      for m in [machine for machine in self.GetAvailableMachines(label)
380                if not machine.locked]:
381        if not m.checksum:
382          m.locked = True
383          m.test_run = threading.current_thread()
384          return m
385      # This logic ensures that threads waiting on a machine will get a machine
386      # with a checksum equal to their image over other threads. This saves time
387      # when crosperf initially assigns the machines to threads by minimizing
388      # the number of re-images.
389      # TODO(asharif): If we centralize the thread-scheduler, we wont need this
390      # code and can implement minimal reimaging code more cleanly.
391      for m in [machine for machine in self.GetAvailableMachines(label)
392                if not machine.locked]:
393        if time.time() - m.released_time > 15:
394          # The release time gap is too large, so it is probably in the start
395          # stage, we need to reset the released_time.
396          m.released_time = time.time()
397        elif time.time() - m.released_time > 8:
398          m.locked = True
399          m.test_run = threading.current_thread()
400          return m
401    return None
402
403  def GetAvailableMachines(self, label=None):
404    if not label:
405      return self._machines
406    return [m for m in self._machines if m.name in label.remote]
407
408  def GetMachines(self, label=None):
409    if not label:
410      return self._all_machines
411    return [m for m in self._all_machines if m.name in label.remote]
412
413  def ReleaseMachine(self, machine):
414    with self._lock:
415      for m in self._machines:
416        if machine.name == m.name:
417          assert m.locked == True, "Tried to double-release %s" % m.name
418          m.released_time = time.time()
419          m.locked = False
420          m.status = "Available"
421          break
422
423  def Cleanup(self):
424    with self._lock:
425      # Unlock all machines.
426      for m in self._machines:
427        res = lock_machine.Machine(m.name, self.locks_dir).Unlock(True)
428
429        if not res:
430          self.logger.LogError("Could not unlock machine: '%s'."
431                               % m.name)
432
433  def __str__(self):
434    with self._lock:
435      l = ["MachineManager Status:"]
436      for m in self._machines:
437        l.append(str(m))
438      return "\n".join(l)
439
440  def AsString(self):
441    with self._lock:
442      stringify_fmt = "%-30s %-10s %-4s %-25s %-32s"
443      header = stringify_fmt % ("Machine", "Thread", "Lock", "Status",
444                                "Checksum")
445      table = [header]
446      for m in self._machines:
447        if m.test_run:
448          test_name = m.test_run.name
449          test_status = m.test_run.timeline.GetLastEvent()
450        else:
451          test_name = ""
452          test_status = ""
453
454        try:
455          machine_string = stringify_fmt % (m.name,
456                                            test_name,
457                                            m.locked,
458                                            test_status,
459                                            m.checksum)
460        except Exception:
461          machine_string = ""
462        table.append(machine_string)
463      return "Machine Status:\n%s" % "\n".join(table)
464
465  def GetAllCPUInfo(self, labels):
466    """Get cpuinfo for labels, merge them if their cpuinfo are the same."""
467    dic = {}
468    for label in labels:
469      for machine in self._all_machines:
470        if machine.name in label.remote:
471          if machine.cpuinfo not in dic:
472            dic[machine.cpuinfo] = [label.name]
473          else:
474            dic[machine.cpuinfo].append(label.name)
475          break
476    output = ""
477    for key, v in dic.items():
478      output += " ".join(v)
479      output += "\n-------------------\n"
480      output += key
481      output += "\n\n\n"
482    return output
483
484
485class MockCrosMachine(CrosMachine):
486  def __init__(self, name, chromeos_root, log_level):
487    self.name = name
488    self.image = None
489    self.checksum = None
490    self.locked = False
491    self.released_time = time.time()
492    self.test_run = None
493    self.chromeos_root = chromeos_root
494    self.checksum_string = re.sub("\d", "", name)
495    #In test, we assume "lumpy1", "lumpy2" are the same machine.
496    self.machine_checksum =  self._GetMD5Checksum(self.checksum_string)
497    self.log_level = log_level
498
499  def IsReachable(self):
500    return True
501
502
503class MockMachineManager(MachineManager):
504
505  def __init__(self, chromeos_root, acquire_timeout, log_level):
506    super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout,
507                                             log_level)
508
509  def _TryToLockMachine(self, cros_machine):
510    self._machines.append(cros_machine)
511    cros_machine.checksum = ""
512
513  def AddMachine(self, machine_name):
514    with self._lock:
515      for m in self._all_machines:
516        assert m.name != machine_name, "Tried to double-add %s" % machine_name
517      cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level)
518      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
519                                   machine_name)
520      self._all_machines.append(cm)
521
522  def AcquireMachine(self, chromeos_image, label, throw=False):
523    for machine in self._all_machines:
524      if not machine.locked:
525        machine.locked = True
526        return machine
527    return None
528
529  def ImageMachine(self, machine_name, label):
530    return 0
531
532  def ReleaseMachine(self, machine):
533    machine.locked = False
534
535  def GetMachines(self, label):
536    return self._all_machines
537
538  def GetAvailableMachines(self, label):
539    return self._all_machines
540