experiment_runner.py revision 6a7dfb33f661a0e0193716a3fcc46a6984486401
1# Copyright 2011-2015 Google Inc. All Rights Reserved.
2
3"""The experiment runner module."""
4import getpass
5import os
6import shutil
7import time
8
9import afe_lock_machine
10import test_flag
11
12from cros_utils import command_executer
13from cros_utils import logger
14from cros_utils.email_sender import EmailSender
15from cros_utils.file_utils import FileUtils
16
17import config
18from experiment_status import ExperimentStatus
19from results_cache import CacheConditions
20from results_cache import ResultsCache
21from results_report import HTMLResultsReport
22from results_report import TextResultsReport
23from results_report import JSONResultsReport
24from schedv2 import Schedv2
25
26
27class ExperimentRunner(object):
28  """ExperimentRunner Class."""
29
30  STATUS_TIME_DELAY = 30
31  THREAD_MONITOR_DELAY = 2
32
33  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
34               cmd_exec=None):
35    self._experiment = experiment
36    self.l = log or logger.GetLogger(experiment.log_dir)
37    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
38    self._terminated = False
39    self.json_report = json_report
40    self.locked_machines = []
41    if experiment.log_level != "verbose":
42      self.STATUS_TIME_DELAY = 10
43
44    # Setting this to True will use crosperf sched v2 (feature in progress).
45    self._using_schedv2 = using_schedv2
46
47  def _GetMachineList(self):
48    """Return a list of all requested machines.
49
50    Create a list of all the requested machines, both global requests and
51    label-specific requests, and return the list.
52    """
53    machines = self._experiment.remote
54    # All Label.remote is a sublist of experiment.remote.
55    for l in self._experiment.labels:
56      for r in l.remote:
57        assert r in machines
58    return machines
59
60  def _UpdateMachineList(self, locked_machines):
61    """Update machines lists to contain only locked machines.
62
63    Go through all the lists of requested machines, both global and
64    label-specific requests, and remove any machine that we were not
65    able to lock.
66
67    Args:
68      locked_machines: A list of the machines we successfully locked.
69    """
70    for m in self._experiment.remote:
71      if m not in locked_machines:
72        self._experiment.remote.remove(m)
73
74    for l in self._experiment.labels:
75      for m in l.remote:
76        if m not in locked_machines:
77          l.remote.remove(m)
78
79  def _LockAllMachines(self, experiment):
80    """Attempt to globally lock all of the machines requested for run.
81
82    This method will use the AFE server to globally lock all of the machines
83    requested for this crosperf run, to prevent any other crosperf runs from
84    being able to update/use the machines while this experiment is running.
85    """
86    if test_flag.GetTestMode():
87      self.locked_machines = self._GetMachineList()
88      self._experiment.locked_machines = self.locked_machines
89    else:
90      lock_mgr = afe_lock_machine.AFELockManager(
91          self._GetMachineList(),
92          "",
93          experiment.labels[0].chromeos_root,
94          None,
95          log=self.l,
96      )
97      for m in lock_mgr.machines:
98        if not lock_mgr.MachineIsKnown(m):
99          lock_mgr.AddLocalMachine(m)
100      machine_states = lock_mgr.GetMachineStates("lock")
101      lock_mgr.CheckMachineLocks(machine_states, "lock")
102      self.locked_machines = lock_mgr.UpdateMachines(True)
103      self._experiment.locked_machines = self.locked_machines
104      self._UpdateMachineList(self.locked_machines)
105      self._experiment.machine_manager.RemoveNonLockedMachines(
106          self.locked_machines)
107      if len(self.locked_machines) == 0:
108        raise RuntimeError("Unable to lock any machines.")
109
110  def _UnlockAllMachines(self, experiment):
111    """Attempt to globally unlock all of the machines requested for run.
112
113    The method will use the AFE server to globally unlock all of the machines
114    requested for this crosperf run.
115    """
116    if not self.locked_machines or test_flag.GetTestMode():
117      return
118
119    lock_mgr = afe_lock_machine.AFELockManager(
120        self.locked_machines,
121        "",
122        experiment.labels[0].chromeos_root,
123        None,
124        log=self.l,
125    )
126    machine_states = lock_mgr.GetMachineStates("unlock")
127    lock_mgr.CheckMachineLocks(machine_states, "unlock")
128    lock_mgr.UpdateMachines(False)
129
130  def _ClearCacheEntries(self, experiment):
131    for br in experiment.benchmark_runs:
132      cache = ResultsCache()
133      cache.Init(br.label.chromeos_image, br.label.chromeos_root,
134                 br.benchmark.test_name, br.iteration, br.test_args,
135                 br.profiler_args, br.machine_manager, br.machine,
136                 br.label.board, br.cache_conditions, br._logger, br.log_level,
137                 br.label, br.share_cache, br.benchmark.suite,
138                 br.benchmark.show_all_results, br.benchmark.run_local)
139      cache_dir = cache._GetCacheDirForWrite()
140      if os.path.exists(cache_dir):
141        self.l.LogOutput("Removing cache dir: %s" % cache_dir)
142        shutil.rmtree(cache_dir)
143
144  def _Run(self, experiment):
145    try:
146      if not experiment.locks_dir:
147        self._LockAllMachines(experiment)
148      if self._using_schedv2:
149        schedv2 = Schedv2(experiment)
150        experiment.set_schedv2(schedv2)
151      if CacheConditions.FALSE in experiment.cache_conditions:
152        self._ClearCacheEntries(experiment)
153      status = ExperimentStatus(experiment)
154      experiment.Run()
155      last_status_time = 0
156      last_status_string = ""
157      try:
158        if experiment.log_level != "verbose":
159          self.l.LogStartDots()
160        while not experiment.IsComplete():
161          if last_status_time + self.STATUS_TIME_DELAY < time.time():
162            last_status_time = time.time()
163            border = "=============================="
164            if experiment.log_level == "verbose":
165              self.l.LogOutput(border)
166              self.l.LogOutput(status.GetProgressString())
167              self.l.LogOutput(status.GetStatusString())
168              self.l.LogOutput(border)
169            else:
170              current_status_string = status.GetStatusString()
171              if current_status_string != last_status_string:
172                self.l.LogEndDots()
173                self.l.LogOutput(border)
174                self.l.LogOutput(current_status_string)
175                self.l.LogOutput(border)
176                last_status_string = current_status_string
177              else:
178                self.l.LogAppendDot()
179          time.sleep(self.THREAD_MONITOR_DELAY)
180      except KeyboardInterrupt:
181        self._terminated = True
182        self.l.LogError("Ctrl-c pressed. Cleaning up...")
183        experiment.Terminate()
184        raise
185      except SystemExit:
186        self._terminated = True
187        self.l.LogError("Unexpected exit. Cleaning up...")
188        experiment.Terminate()
189        raise
190    finally:
191      if not experiment.locks_dir:
192        self._UnlockAllMachines(experiment)
193
194  def _PrintTable(self, experiment):
195    self.l.LogOutput(TextResultsReport(experiment).GetReport())
196
197  def _Email(self, experiment):
198    # Only email by default if a new run was completed.
199    send_mail = False
200    for benchmark_run in experiment.benchmark_runs:
201      if not benchmark_run.cache_hit:
202        send_mail = True
203        break
204    if (not send_mail and not experiment.email_to
205        or config.GetConfig("no_email")):
206      return
207
208    label_names = []
209    for label in experiment.labels:
210      label_names.append(label.name)
211    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
212
213    text_report = TextResultsReport(experiment, True).GetReport()
214    text_report += ("\nResults are stored in %s.\n" %
215                    experiment.results_directory)
216    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
217    html_report = HTMLResultsReport(experiment).GetReport()
218    attachment = EmailSender.Attachment("report.html", html_report)
219    email_to = experiment.email_to or []
220    email_to.append(getpass.getuser())
221    EmailSender().SendEmail(email_to,
222                            subject,
223                            text_report,
224                            attachments=[attachment],
225                            msg_type="html")
226
227  def _StoreResults(self, experiment):
228    if self._terminated:
229      return
230    results_directory = experiment.results_directory
231    FileUtils().RmDir(results_directory)
232    FileUtils().MkDirP(results_directory)
233    self.l.LogOutput("Storing experiment file in %s." % results_directory)
234    experiment_file_path = os.path.join(results_directory,
235                                        "experiment.exp")
236    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
237
238    self.l.LogOutput("Storing results report in %s." % results_directory)
239    results_table_path = os.path.join(results_directory, "results.html")
240    report = HTMLResultsReport(experiment).GetReport()
241    if self.json_report:
242      JSONResultsReport(experiment).GetReport(results_directory)
243    FileUtils().WriteFile(results_table_path, report)
244
245    self.l.LogOutput("Storing email message body in %s." % results_directory)
246    msg_file_path = os.path.join(results_directory, "msg_body.html")
247    text_report = TextResultsReport(experiment, True).GetReport()
248    text_report += ("\nResults are stored in %s.\n" %
249                    experiment.results_directory)
250    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
251    FileUtils().WriteFile(msg_file_path, msg_body)
252
253    self.l.LogOutput("Storing results of each benchmark run.")
254    for benchmark_run in experiment.benchmark_runs:
255      if benchmark_run.result:
256        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
257        benchmark_run_path = os.path.join(results_directory,
258                                          benchmark_run_name)
259        benchmark_run.result.CopyResultsTo(benchmark_run_path)
260        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
261
262  def Run(self):
263    try:
264      self._Run(self._experiment)
265    finally:
266      # Always print the report at the end of the run.
267      self._PrintTable(self._experiment)
268      if not self._terminated:
269        self._StoreResults(self._experiment)
270        self._Email(self._experiment)
271
272
273class MockExperimentRunner(ExperimentRunner):
274  """Mocked ExperimentRunner for testing."""
275
276  def __init__(self, experiment, json_report):
277    super(MockExperimentRunner, self).__init__(experiment, json_report)
278
279  def _Run(self, experiment):
280    self.l.LogOutput("Would run the following experiment: '%s'." %
281                     experiment.name)
282
283  def _PrintTable(self, experiment):
284    self.l.LogOutput("Would print the experiment table.")
285
286  def _Email(self, experiment):
287    self.l.LogOutput("Would send result email.")
288
289  def _StoreResults(self, experiment):
290    self.l.LogOutput("Would store the results.")
291