experiment.py revision dd4176150697727cdc89574698fd53f556c1304d
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""The experiment setting module."""
8
9import os
10import time
11
12import afe_lock_machine
13from threading import Lock
14
15from utils import logger
16from utils import misc
17
18import benchmark_run
19from machine_manager import BadChecksum
20from machine_manager import MachineManager
21from machine_manager import MockMachineManager
22import test_flag
23
24
25class Experiment(object):
26  """Class representing an Experiment to be run."""
27
28  def __init__(self, name, remote, working_directory,
29               chromeos_root, cache_conditions, labels, benchmarks,
30               experiment_file, email_to, acquire_timeout, log_dir,
31               log_level, share_cache, results_directory, locks_directory):
32    self.name = name
33    self.working_directory = working_directory
34    self.remote = remote
35    self.chromeos_root = chromeos_root
36    self.cache_conditions = cache_conditions
37    self.experiment_file = experiment_file
38    self.email_to = email_to
39    if not results_directory:
40      self.results_directory = os.path.join(self.working_directory,
41                                            self.name + "_results")
42    else:
43      self.results_directory = misc.CanonicalizePath(results_directory)
44    self.log_dir = log_dir
45    self.log_level = log_level
46    self.labels = labels
47    self.benchmarks = benchmarks
48    self.num_complete = 0
49    self.num_run_complete = 0
50    self.share_cache = share_cache
51    # If locks_directory (self.lock_dir) not blank, we will use the file
52    # locking mechanism; if it is blank then we will use the AFE server
53    # locking mechanism.
54    self.locks_dir = locks_directory
55    self.locked_machines = []
56
57    if not remote:
58      raise RuntimeError("No remote hosts specified")
59    if not self.benchmarks:
60      raise RuntimeError("No benchmarks specified")
61    if not self.labels:
62      raise RuntimeError("No labels specified")
63
64    # We need one chromeos_root to run the benchmarks in, but it doesn't
65    # matter where it is, unless the ABIs are different.
66    if not chromeos_root:
67      for label in self.labels:
68        if label.chromeos_root:
69          chromeos_root = label.chromeos_root
70          break
71    if not chromeos_root:
72      raise RuntimeError("No chromeos_root given and could not determine "
73                         "one from the image path.")
74
75    if test_flag.GetTestMode():
76      self.machine_manager = MockMachineManager(chromeos_root, acquire_timeout,
77                                                log_level, locks_directory)
78    else:
79      self.machine_manager = MachineManager(chromeos_root, acquire_timeout,
80                                            log_level, locks_directory)
81    self.l = logger.GetLogger(log_dir)
82
83    for machine in self.remote:
84      # machine_manager.AddMachine only adds reachable machines.
85      self.machine_manager.AddMachine(machine)
86    # Now machine_manager._all_machines contains a list of reachable
87    # machines. This is a subset of self.remote. We make both lists the same.
88    self.remote = [m.name for m in self.machine_manager._all_machines]
89
90    for label in labels:
91      # We filter out label remotes that are not reachable (not in
92      # self.remote). So each label.remote is a sublist of experiment.remote.
93      label.remote = filter(lambda x: x in self.remote, label.remote)
94      try:
95        self.machine_manager.ComputeCommonCheckSum(label)
96      except BadChecksum:
97        # Force same image on all machines, then we do checksum again. No
98        # bailout if checksums still do not match.
99        self.machine_manager.ForceSameImageToAllMachines(label)
100        self.machine_manager.ComputeCommonCheckSum(label)
101
102      self.machine_manager.ComputeCommonCheckSumString(label)
103
104    self.start_time = None
105    self.benchmark_runs = self._GenerateBenchmarkRuns()
106
107    self._schedv2 = None
108    self._internal_counter_lock = Lock()
109
110  def set_schedv2(self, schedv2):
111    self._schedv2 = schedv2
112
113  def schedv2(self):
114    return self._schedv2
115
116  def _GenerateBenchmarkRuns(self):
117    """Generate benchmark runs from labels and benchmark defintions."""
118    benchmark_runs = []
119    for label in self.labels:
120      for benchmark in self.benchmarks:
121        for iteration in range(1, benchmark.iterations + 1):
122
123          benchmark_run_name = "%s: %s (%s)" % (label.name, benchmark.name,
124                                                iteration)
125          full_name = "%s_%s_%s" % (label.name, benchmark.name, iteration)
126          logger_to_use = logger.Logger(self.log_dir,
127                                        "run.%s" % (full_name),
128                                        True)
129          benchmark_runs.append(benchmark_run.BenchmarkRun(
130              benchmark_run_name,
131              benchmark,
132              label,
133              iteration,
134              self.cache_conditions,
135              self.machine_manager,
136              logger_to_use,
137              self.log_level,
138              self.share_cache))
139
140    return benchmark_runs
141
142  def Build(self):
143    pass
144
145  def Terminate(self):
146    if self._schedv2 is not None:
147      self._schedv2.terminate()
148    else:
149      for t in self.benchmark_runs:
150        if t.isAlive():
151          self.l.LogError("Terminating run: '%s'." % t.name)
152          t.Terminate()
153
154  def IsComplete(self):
155    if self._schedv2:
156      return self._schedv2.is_complete()
157    if self.active_threads:
158      for t in self.active_threads:
159        if t.isAlive():
160          t.join(0)
161        if not t.isAlive():
162          self.num_complete += 1
163          if not t.cache_hit:
164            self.num_run_complete += 1
165          self.active_threads.remove(t)
166      return False
167    return True
168
169  def BenchmarkRunFinished(self, br):
170      """Update internal counters after br finishes.
171
172      Note this is only used by schedv2 and is called by multiple threads.
173      Never throw any exception here.
174      """
175
176      assert self._schedv2 is not None
177      with self._internal_counter_lock:
178          self.num_complete += 1
179          if not br.cache_hit:
180            self.num_run_complete += 1
181
182  def Run(self):
183    self.start_time = time.time()
184    if self._schedv2 is not None:
185      self._schedv2.run_sched()
186    else:
187      self.active_threads = []
188      for benchmark_run in self.benchmark_runs:
189        # Set threads to daemon so program exits when ctrl-c is pressed.
190        benchmark_run.daemon = True
191        benchmark_run.start()
192        self.active_threads.append(benchmark_run)
193
194  def SetCacheConditions(self, cache_conditions):
195    for benchmark_run in self.benchmark_runs:
196      benchmark_run.SetCacheConditions(cache_conditions)
197
198  def Cleanup(self):
199    """Make sure all machines are unlocked."""
200    if self.locks_dir:
201      # We are using the file locks mechanism, so call machine_manager.Cleanup
202      # to unlock everything.
203      self.machine_manager.Cleanup()
204    else:
205      all_machines = self.locked_machines
206      if not all_machines:
207        return
208
209      # If we locked any machines earlier, make sure we unlock them now.
210      lock_mgr = afe_lock_machine.AFELockManager(all_machines, "",
211                                                 self.labels[0].chromeos_root,
212                                                 None)
213      machine_states = lock_mgr.GetMachineStates("unlock")
214      for k, state in machine_states.iteritems():
215        if state["locked"]:
216          lock_mgr.UpdateLockInAFE(False, k)
217