machine_manager.py revision c152cdb3fa9aca8e1c9c9699913ca17d50f1e42b
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import hashlib
8import image_chromeos
9import file_lock_machine
10import math
11import os.path
12import re
13import sys
14import threading
15import time
16
17
18from utils import command_executer
19from utils import logger
20from utils import misc
21from utils.file_utils import FileUtils
22
23CHECKSUM_FILE = "/usr/local/osimage_checksum_file"
24
25class BadChecksum(Exception):
26  """Raised if all machines for a label don't have the same checksum."""
27  pass
28
29class BadChecksumString(Exception):
30  """Raised if all machines for a label don't have the same checksum string."""
31  pass
32
33class MissingLocksDirectory(Exception):
34    """Raised when cannot find/access the machine locks directory."""
35
36class CrosCommandError(Exception):
37    """Raised when an error occurs running command on DUT."""
38
39class CrosMachine(object):
40  def __init__(self, name, chromeos_root, log_level, cmd_exec=None):
41    self.name = name
42    self.image = None
43    # We relate a dut with a label if we reimage the dut using label or we
44    # detect at the very beginning that the dut is running this label.
45    self.label = None
46    self.checksum = None
47    self.locked = False
48    self.released_time = time.time()
49    self.test_run = None
50    self.chromeos_root = chromeos_root
51    self.log_level = log_level
52    self.ce = cmd_exec or command_executer.GetCommandExecuter(
53        log_level=self.log_level)
54    self.SetUpChecksumInfo()
55
56  def SetUpChecksumInfo(self):
57    if not self.IsReachable():
58      self.machine_checksum = None
59      return
60    self._GetMemoryInfo()
61    self._GetCPUInfo()
62    self._ComputeMachineChecksumString()
63    self._GetMachineID()
64    self.machine_checksum = self._GetMD5Checksum(self.checksum_string)
65    self.machine_id_checksum = self._GetMD5Checksum(self.machine_id)
66
67  def IsReachable(self):
68    command = "ls"
69    ret = self.ce.CrosRunCommand(command,
70                                 machine=self.name,
71                                 chromeos_root=self.chromeos_root)
72    if ret:
73      return False
74    return True
75
76  def _ParseMemoryInfo(self):
77    line = self.meminfo.splitlines()[0]
78    usable_kbytes = int(line.split()[1])
79    # This code is from src/third_party/test/files/client/bin/base_utils.py
80    # usable_kbytes is system's usable DRAM in kbytes,
81    #   as reported by memtotal() from device /proc/meminfo memtotal
82    #   after Linux deducts 1.5% to 9.5% for system table overhead
83    # Undo the unknown actual deduction by rounding up
84    #   to next small multiple of a big power-of-two
85    #   eg  12GB - 5.1% gets rounded back up to 12GB
86    mindeduct = 0.005  # 0.5 percent
87    maxdeduct = 0.095  # 9.5 percent
88    # deduction range 1.5% .. 9.5% supports physical mem sizes
89    #    6GB .. 12GB in steps of .5GB
90    #   12GB .. 24GB in steps of 1 GB
91    #   24GB .. 48GB in steps of 2 GB ...
92    # Finer granularity in physical mem sizes would require
93    #   tighter spread between min and max possible deductions
94
95    # increase mem size by at least min deduction, without rounding
96    min_kbytes = int(usable_kbytes / (1.0 - mindeduct))
97    # increase mem size further by 2**n rounding, by 0..roundKb or more
98    round_kbytes = int(usable_kbytes / (1.0 - maxdeduct)) - min_kbytes
99    # find least binary roundup 2**n that covers worst-cast roundKb
100    mod2n = 1 << int(math.ceil(math.log(round_kbytes, 2)))
101    # have round_kbytes <= mod2n < round_kbytes*2
102    # round min_kbytes up to next multiple of mod2n
103    phys_kbytes = min_kbytes + mod2n - 1
104    phys_kbytes -= phys_kbytes % mod2n  # clear low bits
105    self.phys_kbytes = phys_kbytes
106
107  def _GetMemoryInfo(self):
108    #TODO yunlian: when the machine in rebooting, it will not return
109    #meminfo, the assert does not catch it either
110    command = "cat /proc/meminfo"
111    ret, self.meminfo, _ = self.ce.CrosRunCommand(command, return_output=True,
112                                              machine=self.name,
113                                              chromeos_root=self.chromeos_root)
114    assert ret == 0, "Could not get meminfo from machine: %s" % self.name
115    if ret == 0:
116      self._ParseMemoryInfo()
117
118  #cpuinfo format is different across architecture
119  #need to find a better way to parse it.
120  def _ParseCPUInfo(self, cpuinfo):
121    return 0
122
123  def _GetCPUInfo(self):
124    command = "cat /proc/cpuinfo"
125    ret, self.cpuinfo, _ = self.ce.CrosRunCommand(command, return_output=True,
126                                              machine=self.name,
127                                              chromeos_root=self.chromeos_root)
128    assert ret == 0, "Could not get cpuinfo from machine: %s" % self.name
129    if ret == 0:
130      self._ParseCPUInfo(self.cpuinfo)
131
132  def _ComputeMachineChecksumString(self):
133    self.checksum_string = ""
134    exclude_lines_list = ["MHz", "BogoMIPS", "bogomips"]
135    for line in self.cpuinfo.splitlines():
136      if not any([e in line for e in exclude_lines_list]):
137        self.checksum_string += line
138    self.checksum_string += " " + str(self.phys_kbytes)
139
140  def _GetMD5Checksum(self, ss):
141    if ss:
142      return hashlib.md5(ss).hexdigest()
143    else:
144      return ""
145
146  def _GetMachineID(self):
147    command = "dump_vpd_log --full --stdout"
148    ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True,
149                                            machine=self.name,
150                                            chromeos_root=self.chromeos_root)
151    b = if_out.splitlines()
152    a = [l for l in b if "Product" in l]
153    if len(a):
154      self.machine_id = a[0]
155      return
156    command = "ifconfig"
157    ret, if_out, _ = self.ce.CrosRunCommand(command, return_output=True,
158                                            machine=self.name,
159                                            chromeos_root=self.chromeos_root)
160    b = if_out.splitlines()
161    a = [l for l in b if "HWaddr" in l]
162    if len(a):
163      self.machine_id = "_".join(a)
164      return
165    a = [l for l in b if "ether" in l]
166    if len(a):
167      self.machine_id = "_".join(a)
168      return
169    assert 0, "Could not get machine_id from machine: %s" % self.name
170
171  def __str__(self):
172    l = []
173    l.append(self.name)
174    l.append(str(self.image))
175    l.append(str(self.checksum))
176    l.append(str(self.locked))
177    l.append(str(self.released_time))
178    return ", ".join(l)
179
180
181class MachineManager(object):
182  """Lock, image and unlock machines locally for benchmark runs.
183
184  This class contains methods and calls to lock, unlock and image
185  machines and distribute machines to each benchmark run.  The assumption is
186  that all of the machines for the experiment have been globally locked
187  (using an AFE server) in the ExperimentRunner, but the machines still need
188  to be locally locked/unlocked (allocated to benchmark runs) to prevent
189  multiple benchmark runs within the same experiment from trying to use the
190  same machine at the same time.
191  """
192  def __init__(self, chromeos_root, acquire_timeout, log_level, locks_dir,
193               cmd_exec=None, lgr=None):
194    self._lock = threading.RLock()
195    self._all_machines = []
196    self._machines = []
197    self.image_lock = threading.Lock()
198    self.num_reimages = 0
199    self.chromeos_root = None
200    self.machine_checksum = {}
201    self.machine_checksum_string = {}
202    self.acquire_timeout = acquire_timeout
203    self.log_level = log_level
204    self.locks_dir = locks_dir
205    self.ce = cmd_exec or command_executer.GetCommandExecuter(
206        log_level=self.log_level)
207    self.logger = lgr or logger.GetLogger()
208
209    if self.locks_dir and not os.path.isdir(self.locks_dir):
210      raise MissingLocksDirectory("Cannot access locks directory: %s"
211                                   % self.locks_dir)
212
213    self._initialized_machines = []
214    self.chromeos_root = chromeos_root
215
216  def RemoveNonLockedMachines(self, locked_machines):
217    for m in self._all_machines:
218      if m.name not in locked_machines:
219        self._all_machines.remove(m)
220
221    for m in self._machines:
222      if m.name not in locked_machines:
223        self._machines.remove(m)
224
225  def GetChromeVersion(self, machine):
226    """Get the version of Chrome running on the DUT."""
227
228    cmd = "/opt/google/chrome/chrome --version"
229    ret, version, _ = self.ce.CrosRunCommand(cmd, return_output=True,
230                                             machine=machine.name,
231                                             chromeos_root=self.chromeos_root)
232    if ret != 0:
233      raise CrosCommandError("Couldn't get Chrome version from %s."
234                             % machine.name)
235
236    if ret != 0:
237      version = ""
238    return version.rstrip()
239
240  def ImageMachine(self, machine, label):
241    checksum = label.checksum
242
243    if checksum and (machine.checksum == checksum):
244      return
245    chromeos_root = label.chromeos_root
246    if not chromeos_root:
247      chromeos_root = self.chromeos_root
248    image_chromeos_args = [image_chromeos.__file__,
249                           "--no_lock",
250                           "--chromeos_root=%s" % chromeos_root,
251                           "--image=%s" % label.chromeos_image,
252                           "--image_args=%s" % label.image_args,
253                           "--remote=%s" % machine.name,
254                           "--logging_level=%s" % self.log_level]
255    if label.board:
256      image_chromeos_args.append("--board=%s" % label.board)
257
258    # Currently can't image two machines at once.
259    # So have to serialized on this lock.
260    save_ce_log_level = self.ce.log_level
261    if self.log_level != "verbose":
262      self.ce.log_level = "average"
263
264    with self.image_lock:
265      if self.log_level != "verbose":
266        self.logger.LogOutput("Pushing image onto machine.")
267        self.logger.LogOutput("Running image_chromeos.DoImage with %s"
268                                 % " ".join(image_chromeos_args))
269      retval = image_chromeos.DoImage(image_chromeos_args)
270      if retval:
271        cmd = "reboot && exit"
272        if self.log_level != "verbose":
273          self.logger.LogOutput("reboot & exit.")
274        self.ce.CrosRunCommand(cmd, machine=machine.name,
275                               chromeos_root=self.chromeos_root)
276        time.sleep(60)
277        if self.log_level != "verbose":
278          self.logger.LogOutput("Pushing image onto machine.")
279          self.logger.LogOutput("Running image_chromeos.DoImage with %s"
280                                     % " ".join(image_chromeos_args))
281        retval = image_chromeos.DoImage(image_chromeos_args)
282      if retval:
283        raise Exception("Could not image machine: '%s'." % machine.name)
284      else:
285        self.num_reimages += 1
286      machine.checksum = checksum
287      machine.image = label.chromeos_image
288      machine.label = label
289
290    if not label.chrome_version:
291      label.chrome_version = self.GetChromeVersion(machine)
292
293    self.ce.log_level = save_ce_log_level
294    return retval
295
296  def ComputeCommonCheckSum(self, label):
297    # Since this is used for cache lookups before the machines have been
298    # compared/verified, check here to make sure they all have the same
299    # checksum (otherwise the cache lookup may not be valid).
300    common_checksum = None
301    for machine in self.GetMachines(label):
302      # Make sure the machine's checksums are calculated.
303      if not machine.machine_checksum:
304        machine.SetUpChecksumInfo()
305      cs = machine.machine_checksum
306      # If this is the first machine we've examined, initialize
307      # common_checksum.
308      if not common_checksum:
309        common_checksum = cs
310      # Make sure this machine's checksum matches our 'common' checksum.
311      if cs != common_checksum:
312        raise BadChecksum("Machine checksums do not match!")
313    self.machine_checksum[label.name] = common_checksum
314
315  def ComputeCommonCheckSumString(self, label):
316    # The assumption is that this function is only called AFTER
317    # ComputeCommonCheckSum, so there is no need to verify the machines
318    # are the same here.  If this is ever changed, this function should be
319    # modified to verify that all the machines for a given label are the
320    # same.
321    for machine in self.GetMachines(label):
322      if machine.checksum_string:
323        self.machine_checksum_string[label.name] = machine.checksum_string
324        break
325
326  def _TryToLockMachine(self, cros_machine):
327    with self._lock:
328      assert cros_machine, "Machine can't be None"
329      for m in self._machines:
330        if m.name == cros_machine.name:
331          return
332      locked = True
333      if self.locks_dir:
334        locked = file_lock_machine.Machine(cros_machine.name,
335                                           self.locks_dir).Lock(True,
336                                                                sys.argv[0])
337      if locked:
338        self._machines.append(cros_machine)
339        command = "cat %s" % CHECKSUM_FILE
340        ret, out, _ = self.ce.CrosRunCommand(
341            command, return_output=True, chromeos_root=self.chromeos_root,
342            machine=cros_machine.name)
343        if ret == 0:
344          cros_machine.checksum = out.strip()
345      elif self.locks_dir:
346        self.logger.LogOutput("Couldn't lock: %s" % cros_machine.name)
347
348  # This is called from single threaded mode.
349  def AddMachine(self, machine_name):
350    with self._lock:
351      for m in self._all_machines:
352        assert m.name != machine_name, "Tried to double-add %s" % machine_name
353        if self.log_level != "verbose":
354          self.logger.LogOutput("Setting up remote access to %s"
355                              % machine_name)
356          self.logger.LogOutput("Checking machine characteristics for %s"
357                              % machine_name)
358      cm = CrosMachine(machine_name, self.chromeos_root, self.log_level)
359      if cm.machine_checksum:
360        self._all_machines.append(cm)
361
362
363  def RemoveMachine(self, machine_name):
364    with self._lock:
365      self._machines = [m for m in self._machines
366                        if m.name != machine_name]
367      if self.locks_dir:
368        res = file_lock_machine.Machine(machine_name,
369                                        self.locks_dir).Unlock(True)
370        if not res:
371          self.logger.LogError("Could not unlock machine: '%s'."
372                               % machine_name)
373
374  def ForceSameImageToAllMachines(self, label):
375    machines = self.GetMachines(label)
376    for m in machines:
377      self.ImageMachine(m, label)
378      m.SetUpChecksumInfo()
379
380  def AcquireMachine(self, chromeos_image, label, throw=False):
381    image_checksum = label.checksum
382    machines = self.GetMachines(label)
383    check_interval_time = 120
384    with self._lock:
385      # Lazily external lock machines
386      while self.acquire_timeout >= 0:
387        for m in machines:
388          new_machine = m not in self._all_machines
389          self._TryToLockMachine(m)
390          if new_machine:
391            m.released_time = time.time()
392        if self.GetAvailableMachines(label):
393          break
394        else:
395          sleep_time = max(1, min(self.acquire_timeout, check_interval_time))
396          time.sleep(sleep_time)
397          self.acquire_timeout -= sleep_time
398
399      if self.acquire_timeout < 0:
400        machine_names = []
401        for machine in machines:
402          machine_names.append(machine.name)
403        self.logger.LogFatal("Could not acquire any of the "
404                             "following machines: '%s'"
405                             % ", ".join(machine_names))
406
407###      for m in self._machines:
408###        if (m.locked and time.time() - m.released_time < 10 and
409###            m.checksum == image_checksum):
410###          return None
411      for m in [machine for machine in self.GetAvailableMachines(label)
412                if not machine.locked]:
413        if image_checksum and (m.checksum == image_checksum):
414          m.locked = True
415          m.test_run = threading.current_thread()
416          return m
417      for m in [machine for machine in self.GetAvailableMachines(label)
418                if not machine.locked]:
419        if not m.checksum:
420          m.locked = True
421          m.test_run = threading.current_thread()
422          return m
423      # This logic ensures that threads waiting on a machine will get a machine
424      # with a checksum equal to their image over other threads. This saves time
425      # when crosperf initially assigns the machines to threads by minimizing
426      # the number of re-images.
427      # TODO(asharif): If we centralize the thread-scheduler, we wont need this
428      # code and can implement minimal reimaging code more cleanly.
429      for m in [machine for machine in self.GetAvailableMachines(label)
430                if not machine.locked]:
431        if time.time() - m.released_time > 15:
432          # The release time gap is too large, so it is probably in the start
433          # stage, we need to reset the released_time.
434          m.released_time = time.time()
435        elif time.time() - m.released_time > 8:
436          m.locked = True
437          m.test_run = threading.current_thread()
438          return m
439    return None
440
441  def GetAvailableMachines(self, label=None):
442    if not label:
443      return self._machines
444    return [m for m in self._machines if m.name in label.remote]
445
446  def GetMachines(self, label=None):
447    if not label:
448      return self._all_machines
449    return [m for m in self._all_machines if m.name in label.remote]
450
451  def ReleaseMachine(self, machine):
452    with self._lock:
453      for m in self._machines:
454        if machine.name == m.name:
455          assert m.locked == True, "Tried to double-release %s" % m.name
456          m.released_time = time.time()
457          m.locked = False
458          m.status = "Available"
459          break
460
461  def Cleanup(self):
462    with self._lock:
463      # Unlock all machines (via file lock)
464      for m in self._machines:
465        res = file_lock_machine.Machine(m.name, self.locks_dir).Unlock(True)
466
467        if not res:
468          self.logger.LogError("Could not unlock machine: '%s'."
469                               % m.name)
470
471  def __str__(self):
472    with self._lock:
473      l = ["MachineManager Status:"]
474      for m in self._machines:
475        l.append(str(m))
476      return "\n".join(l)
477
478  def AsString(self):
479    with self._lock:
480      stringify_fmt = "%-30s %-10s %-4s %-25s %-32s"
481      header = stringify_fmt % ("Machine", "Thread", "Lock", "Status",
482                                "Checksum")
483      table = [header]
484      for m in self._machines:
485        if m.test_run:
486          test_name = m.test_run.name
487          test_status = m.test_run.timeline.GetLastEvent()
488        else:
489          test_name = ""
490          test_status = ""
491
492        try:
493          machine_string = stringify_fmt % (m.name,
494                                            test_name,
495                                            m.locked,
496                                            test_status,
497                                            m.checksum)
498        except ValueError:
499          machine_string = ""
500        table.append(machine_string)
501      return "Machine Status:\n%s" % "\n".join(table)
502
503  def GetAllCPUInfo(self, labels):
504    """Get cpuinfo for labels, merge them if their cpuinfo are the same."""
505    dic = {}
506    for label in labels:
507      for machine in self._all_machines:
508        if machine.name in label.remote:
509          if machine.cpuinfo not in dic:
510            dic[machine.cpuinfo] = [label.name]
511          else:
512            dic[machine.cpuinfo].append(label.name)
513          break
514    output = ""
515    for key, v in dic.items():
516      output += " ".join(v)
517      output += "\n-------------------\n"
518      output += key
519      output += "\n\n\n"
520    return output
521
522
523class MockCrosMachine(CrosMachine):
524  def __init__(self, name, chromeos_root, log_level):
525    self.name = name
526    self.image = None
527    self.checksum = None
528    self.locked = False
529    self.released_time = time.time()
530    self.test_run = None
531    self.chromeos_root = chromeos_root
532    self.checksum_string = re.sub("\d", "", name)
533    #In test, we assume "lumpy1", "lumpy2" are the same machine.
534    self.machine_checksum =  self._GetMD5Checksum(self.checksum_string)
535    self.log_level = log_level
536    self.label = None
537
538  def IsReachable(self):
539    return True
540
541
542class MockMachineManager(MachineManager):
543
544  def __init__(self, chromeos_root, acquire_timeout, log_level, dummy_locks_dir):
545    super(MockMachineManager, self).__init__(chromeos_root, acquire_timeout,
546                                             log_level,
547                                             file_lock_machine.Machine.LOCKS_DIR)
548
549  def _TryToLockMachine(self, cros_machine):
550    self._machines.append(cros_machine)
551    cros_machine.checksum = ""
552
553  def AddMachine(self, machine_name):
554    with self._lock:
555      for m in self._all_machines:
556        assert m.name != machine_name, "Tried to double-add %s" % machine_name
557      cm = MockCrosMachine(machine_name, self.chromeos_root, self.log_level)
558      assert cm.machine_checksum, ("Could not find checksum for machine %s" %
559                                   machine_name)
560      # In Original MachineManager, the test is 'if cm.machine_checksum:' - if a
561      # machine is unreachable, then its machine_checksum is None. Here we
562      # cannot do this, because machine_checksum is always faked, so we directly
563      # test cm.IsReachable, which is properly mocked.
564      if cm.IsReachable():
565        self._all_machines.append(cm)
566
567  def AcquireMachine(self, chromeos_image, label, throw=False):
568    for machine in self._all_machines:
569      if not machine.locked:
570        machine.locked = True
571        return machine
572    return None
573
574  def ImageMachine(self, machine_name, label):
575    return 0
576
577  def ReleaseMachine(self, machine):
578    machine.locked = False
579
580  def GetMachines(self, label):
581    return self._all_machines
582
583  def GetAvailableMachines(self, label):
584    return self._all_machines
585