experiment_runner.py revision db77ec326606edf53bffbd5f346f622445e15e04
1# Copyright 2011-2015 Google Inc. All Rights Reserved.
2
3"""The experiment runner module."""
4import getpass
5import os
6import shutil
7import time
8
9import afe_lock_machine
10
11from utils import command_executer
12from utils import logger
13from utils.email_sender import EmailSender
14from utils.file_utils import FileUtils
15
16import config
17from experiment_status import ExperimentStatus
18from results_cache import CacheConditions
19from results_cache import ResultsCache
20from results_report import HTMLResultsReport
21from results_report import TextResultsReport
22from results_report import JSONResultsReport
23from schedv2 import Schedv2
24
25
26class ExperimentRunner(object):
27  """ExperimentRunner Class."""
28
29  STATUS_TIME_DELAY = 30
30  THREAD_MONITOR_DELAY = 2
31
32  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
33               cmd_exec=None):
34    self._experiment = experiment
35    self.l = log or logger.GetLogger(experiment.log_dir)
36    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
37    self._terminated = False
38    self.json_report = json_report
39    self.locked_machines = []
40    if experiment.log_level != "verbose":
41      self.STATUS_TIME_DELAY = 10
42
43    # Setting this to True will use crosperf sched v2 (feature in progress).
44    self._using_schedv2 = using_schedv2
45
46  def _GetMachineList(self):
47    """Return a list of all requested machines.
48
49    Create a list of all the requested machines, both global requests and
50    label-specific requests, and return the list.
51    """
52    machines = self._experiment.remote
53    # All Label.remote is a sublist of experiment.remote.
54    for l in self._experiment.labels:
55      for r in l.remote:
56        assert r in machines
57    return machines
58
59  def _UpdateMachineList(self, locked_machines):
60    """Update machines lists to contain only locked machines.
61
62    Go through all the lists of requested machines, both global and
63    label-specific requests, and remove any machine that we were not
64    able to lock.
65
66    Args:
67      locked_machines: A list of the machines we successfully locked.
68    """
69    for m in self._experiment.remote:
70      if m not in locked_machines:
71        self._experiment.remote.remove(m)
72
73    for l in self._experiment.labels:
74      for m in l.remote:
75        if m not in locked_machines:
76          l.remote.remove(m)
77
78  def _LockAllMachines(self, experiment):
79    """Attempt to globally lock all of the machines requested for run.
80
81    This method will use the AFE server to globally lock all of the machines
82    requested for this crosperf run, to prevent any other crosperf runs from
83    being able to update/use the machines while this experiment is running.
84    """
85    lock_mgr = afe_lock_machine.AFELockManager(
86        self._GetMachineList(),
87        "",
88        experiment.labels[0].chromeos_root,
89        None,
90        log=self.l,
91    )
92    for m in lock_mgr.machines:
93      if not lock_mgr.MachineIsKnown(m):
94        lock_mgr.AddLocalMachine(m)
95    machine_states = lock_mgr.GetMachineStates("lock")
96    lock_mgr.CheckMachineLocks(machine_states, "lock")
97    self.locked_machines = lock_mgr.UpdateMachines(True)
98    self._experiment.locked_machines = self.locked_machines
99    self._UpdateMachineList(self.locked_machines)
100    self._experiment.machine_manager.RemoveNonLockedMachines(
101        self.locked_machines)
102    if len(self.locked_machines) == 0:
103      raise RuntimeError("Unable to lock any machines.")
104
105  def _UnlockAllMachines(self, experiment):
106    """Attempt to globally unlock all of the machines requested for run.
107
108    The method will use the AFE server to globally unlock all of the machines
109    requested for this crosperf run.
110    """
111    if not self.locked_machines:
112      return
113
114    lock_mgr = afe_lock_machine.AFELockManager(
115        self.locked_machines,
116        "",
117        experiment.labels[0].chromeos_root,
118        None,
119        log=self.l,
120    )
121    machine_states = lock_mgr.GetMachineStates("unlock")
122    lock_mgr.CheckMachineLocks(machine_states, "unlock")
123    lock_mgr.UpdateMachines(False)
124
125  def _ClearCacheEntries(self, experiment):
126    for br in experiment.benchmark_runs:
127      cache = ResultsCache()
128      cache.Init(br.label.chromeos_image, br.label.chromeos_root,
129                 br.benchmark.test_name, br.iteration, br.test_args,
130                 br.profiler_args, br.machine_manager, br.machine,
131                 br.label.board, br.cache_conditions, br._logger, br.log_level,
132                 br.label, br.share_cache, br.benchmark.suite,
133                 br.benchmark.show_all_results, br.benchmark.run_local)
134      cache_dir = cache._GetCacheDirForWrite()
135      if os.path.exists(cache_dir):
136        self.l.LogOutput("Removing cache dir: %s" % cache_dir)
137        shutil.rmtree(cache_dir)
138
139  def _Run(self, experiment):
140    try:
141      if not experiment.locks_dir:
142        self._LockAllMachines(experiment)
143      if self._using_schedv2:
144        schedv2 = Schedv2(experiment)
145        experiment.set_schedv2(schedv2)
146      if CacheConditions.FALSE in experiment.cache_conditions:
147        self._ClearCacheEntries(experiment)
148      status = ExperimentStatus(experiment)
149      experiment.Run()
150      last_status_time = 0
151      last_status_string = ""
152      try:
153        if experiment.log_level != "verbose":
154          self.l.LogStartDots()
155        while not experiment.IsComplete():
156          if last_status_time + self.STATUS_TIME_DELAY < time.time():
157            last_status_time = time.time()
158            border = "=============================="
159            if experiment.log_level == "verbose":
160              self.l.LogOutput(border)
161              self.l.LogOutput(status.GetProgressString())
162              self.l.LogOutput(status.GetStatusString())
163              self.l.LogOutput(border)
164            else:
165              current_status_string = status.GetStatusString()
166              if current_status_string != last_status_string:
167                self.l.LogEndDots()
168                self.l.LogOutput(border)
169                self.l.LogOutput(current_status_string)
170                self.l.LogOutput(border)
171                last_status_string = current_status_string
172              else:
173                self.l.LogAppendDot()
174          time.sleep(self.THREAD_MONITOR_DELAY)
175      except KeyboardInterrupt:
176        self._terminated = True
177        self.l.LogError("Ctrl-c pressed. Cleaning up...")
178        experiment.Terminate()
179        raise
180      except SystemExit:
181        self._terminated = True
182        self.l.LogError("Unexpected exit. Cleaning up...")
183        experiment.Terminate()
184        raise
185    finally:
186      if not experiment.locks_dir:
187        self._UnlockAllMachines(experiment)
188
189  def _PrintTable(self, experiment):
190    self.l.LogOutput(TextResultsReport(experiment).GetReport())
191
192  def _Email(self, experiment):
193    # Only email by default if a new run was completed.
194    send_mail = False
195    for benchmark_run in experiment.benchmark_runs:
196      if not benchmark_run.cache_hit:
197        send_mail = True
198        break
199    if (not send_mail and not experiment.email_to
200        or config.GetConfig("no_email")):
201      return
202
203    label_names = []
204    for label in experiment.labels:
205      label_names.append(label.name)
206    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
207
208    text_report = TextResultsReport(experiment, True).GetReport()
209    text_report += ("\nResults are stored in %s.\n" %
210                    experiment.results_directory)
211    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
212    html_report = HTMLResultsReport(experiment).GetReport()
213    attachment = EmailSender.Attachment("report.html", html_report)
214    email_to = [getpass.getuser()] + experiment.email_to
215    EmailSender().SendEmail(email_to,
216                            subject,
217                            text_report,
218                            attachments=[attachment],
219                            msg_type="html")
220
221  def _StoreResults(self, experiment):
222    if self._terminated:
223      return
224    results_directory = experiment.results_directory
225    FileUtils().RmDir(results_directory)
226    FileUtils().MkDirP(results_directory)
227    self.l.LogOutput("Storing experiment file in %s." % results_directory)
228    experiment_file_path = os.path.join(results_directory,
229                                        "experiment.exp")
230    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
231
232    self.l.LogOutput("Storing results report in %s." % results_directory)
233    results_table_path = os.path.join(results_directory, "results.html")
234    report = HTMLResultsReport(experiment).GetReport()
235    if self.json_report:
236      JSONResultsReport(experiment).GetReport(results_directory)
237    FileUtils().WriteFile(results_table_path, report)
238
239    self.l.LogOutput("Storing email message body in %s." % results_directory)
240    msg_file_path = os.path.join(results_directory, "msg_body.html")
241    text_report = TextResultsReport(experiment, True).GetReport()
242    text_report += ("\nResults are stored in %s.\n" %
243                    experiment.results_directory)
244    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
245    FileUtils().WriteFile(msg_file_path, msg_body)
246
247    self.l.LogOutput("Storing results of each benchmark run.")
248    for benchmark_run in experiment.benchmark_runs:
249      if benchmark_run.result:
250        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
251        benchmark_run_path = os.path.join(results_directory,
252                                          benchmark_run_name)
253        benchmark_run.result.CopyResultsTo(benchmark_run_path)
254        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
255
256  def Run(self):
257    try:
258      self._Run(self._experiment)
259    finally:
260      # Always print the report at the end of the run.
261      self._PrintTable(self._experiment)
262      if not self._terminated:
263        self._StoreResults(self._experiment)
264        self._Email(self._experiment)
265
266
267class MockExperimentRunner(ExperimentRunner):
268  """Mocked ExperimentRunner for testing."""
269
270  def __init__(self, experiment):
271    super(MockExperimentRunner, self).__init__(experiment)
272
273  def _Run(self, experiment):
274    self.l.LogOutput("Would run the following experiment: '%s'." %
275                     experiment.name)
276
277  def _PrintTable(self, experiment):
278    self.l.LogOutput("Would print the experiment table.")
279
280  def _Email(self, experiment):
281    self.l.LogOutput("Would send result email.")
282
283  def _StoreResults(self, experiment):
284    self.l.LogOutput("Would store the results.")
285