experiment_runner.py revision 7057cf67ba1dbdd4387f53e5fe47b43c955b1a53
1# Copyright 2011-2015 Google Inc. All Rights Reserved.
2
3"""The experiment runner module."""
4import getpass
5import os
6import shutil
7import time
8
9import afe_lock_machine
10import test_flag
11
12from cros_utils import command_executer
13from cros_utils import logger
14from cros_utils.email_sender import EmailSender
15from cros_utils.file_utils import FileUtils
16
17import config
18from experiment_status import ExperimentStatus
19from results_cache import CacheConditions
20from results_cache import ResultsCache
21from results_report import HTMLResultsReport
22from results_report import TextResultsReport
23from results_report import JSONResultsReport
24from schedv2 import Schedv2
25
26
27class ExperimentRunner(object):
28  """ExperimentRunner Class."""
29
30  STATUS_TIME_DELAY = 30
31  THREAD_MONITOR_DELAY = 2
32
33  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
34               cmd_exec=None):
35    self._experiment = experiment
36    self.l = log or logger.GetLogger(experiment.log_dir)
37    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
38    self._terminated = False
39    self.json_report = json_report
40    self.locked_machines = []
41    if experiment.log_level != "verbose":
42      self.STATUS_TIME_DELAY = 10
43
44    # Setting this to True will use crosperf sched v2 (feature in progress).
45    self._using_schedv2 = using_schedv2
46
47  def _GetMachineList(self):
48    """Return a list of all requested machines.
49
50    Create a list of all the requested machines, both global requests and
51    label-specific requests, and return the list.
52    """
53    machines = self._experiment.remote
54    # All Label.remote is a sublist of experiment.remote.
55    for l in self._experiment.labels:
56      for r in l.remote:
57        assert r in machines
58    return machines
59
60  def _UpdateMachineList(self, locked_machines):
61    """Update machines lists to contain only locked machines.
62
63    Go through all the lists of requested machines, both global and
64    label-specific requests, and remove any machine that we were not
65    able to lock.
66
67    Args:
68      locked_machines: A list of the machines we successfully locked.
69    """
70    for m in self._experiment.remote:
71      if m not in locked_machines:
72        self._experiment.remote.remove(m)
73
74    for l in self._experiment.labels:
75      for m in l.remote:
76        if m not in locked_machines:
77          l.remote.remove(m)
78
79  def _LockAllMachines(self, experiment):
80    """Attempt to globally lock all of the machines requested for run.
81
82    This method will use the AFE server to globally lock all of the machines
83    requested for this crosperf run, to prevent any other crosperf runs from
84    being able to update/use the machines while this experiment is running.
85    """
86    if test_flag.GetTestMode():
87      self.locked_machines = self._GetMachineList()
88      self._experiment.locked_machines = self.locked_machines
89    else:
90      lock_mgr = afe_lock_machine.AFELockManager(
91          self._GetMachineList(),
92          "",
93          experiment.labels[0].chromeos_root,
94          None,
95          log=self.l,
96      )
97      for m in lock_mgr.machines:
98        if not lock_mgr.MachineIsKnown(m):
99          lock_mgr.AddLocalMachine(m)
100      machine_states = lock_mgr.GetMachineStates("lock")
101      lock_mgr.CheckMachineLocks(machine_states, "lock")
102      self.locked_machines = lock_mgr.UpdateMachines(True)
103      self._experiment.locked_machines = self.locked_machines
104      self._UpdateMachineList(self.locked_machines)
105      self._experiment.machine_manager.RemoveNonLockedMachines(
106          self.locked_machines)
107      if len(self.locked_machines) == 0:
108        raise RuntimeError("Unable to lock any machines.")
109
110  def _UnlockAllMachines(self, experiment):
111    """Attempt to globally unlock all of the machines requested for run.
112
113    The method will use the AFE server to globally unlock all of the machines
114    requested for this crosperf run.
115    """
116    if not self.locked_machines or test_flag.GetTestMode():
117      return
118
119    lock_mgr = afe_lock_machine.AFELockManager(
120        self.locked_machines,
121        "",
122        experiment.labels[0].chromeos_root,
123        None,
124        log=self.l,
125    )
126    machine_states = lock_mgr.GetMachineStates("unlock")
127    lock_mgr.CheckMachineLocks(machine_states, "unlock")
128    lock_mgr.UpdateMachines(False)
129
130  def _ClearCacheEntries(self, experiment):
131    for br in experiment.benchmark_runs:
132      cache = ResultsCache()
133      cache.Init(br.label.chromeos_image, br.label.chromeos_root,
134                 br.benchmark.test_name, br.iteration, br.test_args,
135                 br.profiler_args, br.machine_manager, br.machine,
136                 br.label.board, br.cache_conditions, br._logger, br.log_level,
137                 br.label, br.share_cache, br.benchmark.suite,
138                 br.benchmark.show_all_results, br.benchmark.run_local)
139      cache_dir = cache._GetCacheDirForWrite()
140      if os.path.exists(cache_dir):
141        self.l.LogOutput("Removing cache dir: %s" % cache_dir)
142        shutil.rmtree(cache_dir)
143
144  def _Run(self, experiment):
145    try:
146      if not experiment.locks_dir:
147        self._LockAllMachines(experiment)
148      if self._using_schedv2:
149        schedv2 = Schedv2(experiment)
150        experiment.set_schedv2(schedv2)
151      if CacheConditions.FALSE in experiment.cache_conditions:
152        self._ClearCacheEntries(experiment)
153      status = ExperimentStatus(experiment)
154      experiment.Run()
155      last_status_time = 0
156      last_status_string = ""
157      try:
158        if experiment.log_level != "verbose":
159          self.l.LogStartDots()
160        while not experiment.IsComplete():
161          if last_status_time + self.STATUS_TIME_DELAY < time.time():
162            last_status_time = time.time()
163            border = "=============================="
164            if experiment.log_level == "verbose":
165              self.l.LogOutput(border)
166              self.l.LogOutput(status.GetProgressString())
167              self.l.LogOutput(status.GetStatusString())
168              self.l.LogOutput(border)
169            else:
170              current_status_string = status.GetStatusString()
171              if current_status_string != last_status_string:
172                self.l.LogEndDots()
173                self.l.LogOutput(border)
174                self.l.LogOutput(current_status_string)
175                self.l.LogOutput(border)
176                last_status_string = current_status_string
177              else:
178                self.l.LogAppendDot()
179          time.sleep(self.THREAD_MONITOR_DELAY)
180      except KeyboardInterrupt:
181        self._terminated = True
182        self.l.LogError("Ctrl-c pressed. Cleaning up...")
183        experiment.Terminate()
184        raise
185      except SystemExit:
186        self._terminated = True
187        self.l.LogError("Unexpected exit. Cleaning up...")
188        experiment.Terminate()
189        raise
190    finally:
191      if not experiment.locks_dir:
192        self._UnlockAllMachines(experiment)
193
194  def _PrintTable(self, experiment):
195    self.l.LogOutput(TextResultsReport(experiment).GetReport())
196
197  def _Email(self, experiment):
198    # Only email by default if a new run was completed.
199    send_mail = False
200    for benchmark_run in experiment.benchmark_runs:
201      if not benchmark_run.cache_hit:
202        send_mail = True
203        break
204    if (not send_mail and not experiment.email_to
205        or config.GetConfig("no_email")):
206      return
207
208    label_names = []
209    for label in experiment.labels:
210      label_names.append(label.name)
211    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
212
213    text_report = TextResultsReport(experiment, True).GetReport()
214    text_report += ("\nResults are stored in %s.\n" %
215                    experiment.results_directory)
216    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
217    html_report = HTMLResultsReport(experiment).GetReport()
218    attachment = EmailSender.Attachment("report.html", html_report)
219    email_to = [getpass.getuser()] + experiment.email_to
220    EmailSender().SendEmail(email_to,
221                            subject,
222                            text_report,
223                            attachments=[attachment],
224                            msg_type="html")
225
226  def _StoreResults(self, experiment):
227    if self._terminated:
228      return
229    results_directory = experiment.results_directory
230    FileUtils().RmDir(results_directory)
231    FileUtils().MkDirP(results_directory)
232    self.l.LogOutput("Storing experiment file in %s." % results_directory)
233    experiment_file_path = os.path.join(results_directory,
234                                        "experiment.exp")
235    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
236
237    self.l.LogOutput("Storing results report in %s." % results_directory)
238    results_table_path = os.path.join(results_directory, "results.html")
239    report = HTMLResultsReport(experiment).GetReport()
240    if self.json_report:
241      JSONResultsReport(experiment).GetReport(results_directory)
242    FileUtils().WriteFile(results_table_path, report)
243
244    self.l.LogOutput("Storing email message body in %s." % results_directory)
245    msg_file_path = os.path.join(results_directory, "msg_body.html")
246    text_report = TextResultsReport(experiment, True).GetReport()
247    text_report += ("\nResults are stored in %s.\n" %
248                    experiment.results_directory)
249    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
250    FileUtils().WriteFile(msg_file_path, msg_body)
251
252    self.l.LogOutput("Storing results of each benchmark run.")
253    for benchmark_run in experiment.benchmark_runs:
254      if benchmark_run.result:
255        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
256        benchmark_run_path = os.path.join(results_directory,
257                                          benchmark_run_name)
258        benchmark_run.result.CopyResultsTo(benchmark_run_path)
259        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
260
261  def Run(self):
262    try:
263      self._Run(self._experiment)
264    finally:
265      # Always print the report at the end of the run.
266      self._PrintTable(self._experiment)
267      if not self._terminated:
268        self._StoreResults(self._experiment)
269        self._Email(self._experiment)
270
271
272class MockExperimentRunner(ExperimentRunner):
273  """Mocked ExperimentRunner for testing."""
274
275  def __init__(self, experiment, json_report):
276    super(MockExperimentRunner, self).__init__(experiment, json_report)
277
278  def _Run(self, experiment):
279    self.l.LogOutput("Would run the following experiment: '%s'." %
280                     experiment.name)
281
282  def _PrintTable(self, experiment):
283    self.l.LogOutput("Would print the experiment table.")
284
285  def _Email(self, experiment):
286    self.l.LogOutput("Would send result email.")
287
288  def _StoreResults(self, experiment):
289    self.l.LogOutput("Would store the results.")
290