1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""The experiment setting module."""
5
6from __future__ import print_function
7
8import os
9import time
10
11import afe_lock_machine
12from threading import Lock
13
14from cros_utils import logger
15from cros_utils import misc
16
17import benchmark_run
18from machine_manager import BadChecksum
19from machine_manager import MachineManager
20from machine_manager import MockMachineManager
21import test_flag
22
23
24class Experiment(object):
25  """Class representing an Experiment to be run."""
26
27  def __init__(self, name, remote, working_directory, chromeos_root,
28               cache_conditions, labels, benchmarks, experiment_file, email_to,
29               acquire_timeout, log_dir, log_level, share_cache,
30               results_directory, locks_directory):
31    self.name = name
32    self.working_directory = working_directory
33    self.remote = remote
34    self.chromeos_root = chromeos_root
35    self.cache_conditions = cache_conditions
36    self.experiment_file = experiment_file
37    self.email_to = email_to
38    if not results_directory:
39      self.results_directory = os.path.join(self.working_directory,
40                                            self.name + '_results')
41    else:
42      self.results_directory = misc.CanonicalizePath(results_directory)
43    self.log_dir = log_dir
44    self.log_level = log_level
45    self.labels = labels
46    self.benchmarks = benchmarks
47    self.num_complete = 0
48    self.num_run_complete = 0
49    self.share_cache = share_cache
50    self.active_threads = []
51    # If locks_directory (self.lock_dir) not blank, we will use the file
52    # locking mechanism; if it is blank then we will use the AFE server
53    # locking mechanism.
54    self.locks_dir = locks_directory
55    self.locked_machines = []
56
57    if not remote:
58      raise RuntimeError('No remote hosts specified')
59    if not self.benchmarks:
60      raise RuntimeError('No benchmarks specified')
61    if not self.labels:
62      raise RuntimeError('No labels specified')
63
64    # We need one chromeos_root to run the benchmarks in, but it doesn't
65    # matter where it is, unless the ABIs are different.
66    if not chromeos_root:
67      for label in self.labels:
68        if label.chromeos_root:
69          chromeos_root = label.chromeos_root
70          break
71    if not chromeos_root:
72      raise RuntimeError('No chromeos_root given and could not determine '
73                         'one from the image path.')
74
75    machine_manager_fn = MachineManager
76    if test_flag.GetTestMode():
77      machine_manager_fn = MockMachineManager
78    self.machine_manager = machine_manager_fn(chromeos_root, acquire_timeout,
79                                              log_level, locks_directory)
80    self.l = logger.GetLogger(log_dir)
81
82    for machine in self.remote:
83      # machine_manager.AddMachine only adds reachable machines.
84      self.machine_manager.AddMachine(machine)
85    # Now machine_manager._all_machines contains a list of reachable
86    # machines. This is a subset of self.remote. We make both lists the same.
87    self.remote = [m.name for m in self.machine_manager.GetAllMachines()]
88    if not self.remote:
89      raise RuntimeError('No machine available for running experiment.')
90
91    for label in labels:
92      # We filter out label remotes that are not reachable (not in
93      # self.remote). So each label.remote is a sublist of experiment.remote.
94      label.remote = [r for r in label.remote if r in self.remote]
95      try:
96        self.machine_manager.ComputeCommonCheckSum(label)
97      except BadChecksum:
98        # Force same image on all machines, then we do checksum again. No
99        # bailout if checksums still do not match.
100        self.machine_manager.ForceSameImageToAllMachines(label)
101        self.machine_manager.ComputeCommonCheckSum(label)
102
103      self.machine_manager.ComputeCommonCheckSumString(label)
104
105    self.start_time = None
106    self.benchmark_runs = self._GenerateBenchmarkRuns()
107
108    self._schedv2 = None
109    self._internal_counter_lock = Lock()
110
111  def set_schedv2(self, schedv2):
112    self._schedv2 = schedv2
113
114  def schedv2(self):
115    return self._schedv2
116
117  def _GenerateBenchmarkRuns(self):
118    """Generate benchmark runs from labels and benchmark defintions."""
119    benchmark_runs = []
120    for label in self.labels:
121      for benchmark in self.benchmarks:
122        for iteration in xrange(1, benchmark.iterations + 1):
123
124          benchmark_run_name = '%s: %s (%s)' % (label.name, benchmark.name,
125                                                iteration)
126          full_name = '%s_%s_%s' % (label.name, benchmark.name, iteration)
127          logger_to_use = logger.Logger(self.log_dir, 'run.%s' % (full_name),
128                                        True)
129          benchmark_runs.append(benchmark_run.BenchmarkRun(
130              benchmark_run_name, benchmark, label, iteration,
131              self.cache_conditions, self.machine_manager, logger_to_use,
132              self.log_level, self.share_cache))
133
134    return benchmark_runs
135
136  def Build(self):
137    pass
138
139  def Terminate(self):
140    if self._schedv2 is not None:
141      self._schedv2.terminate()
142    else:
143      for t in self.benchmark_runs:
144        if t.isAlive():
145          self.l.LogError("Terminating run: '%s'." % t.name)
146          t.Terminate()
147
148  def IsComplete(self):
149    if self._schedv2:
150      return self._schedv2.is_complete()
151    if self.active_threads:
152      for t in self.active_threads:
153        if t.isAlive():
154          t.join(0)
155        if not t.isAlive():
156          self.num_complete += 1
157          if not t.cache_hit:
158            self.num_run_complete += 1
159          self.active_threads.remove(t)
160      return False
161    return True
162
163  def BenchmarkRunFinished(self, br):
164    """Update internal counters after br finishes.
165
166    Note this is only used by schedv2 and is called by multiple threads.
167    Never throw any exception here.
168    """
169
170    assert self._schedv2 is not None
171    with self._internal_counter_lock:
172      self.num_complete += 1
173      if not br.cache_hit:
174        self.num_run_complete += 1
175
176  def Run(self):
177    self.start_time = time.time()
178    if self._schedv2 is not None:
179      self._schedv2.run_sched()
180    else:
181      self.active_threads = []
182      for run in self.benchmark_runs:
183        # Set threads to daemon so program exits when ctrl-c is pressed.
184        run.daemon = True
185        run.start()
186        self.active_threads.append(run)
187
188  def SetCacheConditions(self, cache_conditions):
189    for run in self.benchmark_runs:
190      run.SetCacheConditions(cache_conditions)
191
192  def Cleanup(self):
193    """Make sure all machines are unlocked."""
194    if self.locks_dir:
195      # We are using the file locks mechanism, so call machine_manager.Cleanup
196      # to unlock everything.
197      self.machine_manager.Cleanup()
198    else:
199      if test_flag.GetTestMode():
200        return
201
202      all_machines = self.locked_machines
203      if not all_machines:
204        return
205
206      # If we locked any machines earlier, make sure we unlock them now.
207      lock_mgr = afe_lock_machine.AFELockManager(
208          all_machines, '', self.labels[0].chromeos_root, None)
209      machine_states = lock_mgr.GetMachineStates('unlock')
210      for k, state in machine_states.iteritems():
211        if state['locked']:
212          lock_mgr.UpdateLockInAFE(False, k)
213