experiment_runner.py revision f9b503542669e34f83e5f6bfd3c4e19a0c43e015
1#!/usr/bin/python
2
3# Copyright 2011-2015 Google Inc. All Rights Reserved.
4
5"""The experiment runner module."""
6import getpass
7import os
8import random
9import shutil
10import sys
11import time
12
13import afe_lock_machine
14
15from utils import command_executer
16from utils import logger
17from utils.email_sender import EmailSender
18from utils.file_utils import FileUtils
19
20import config
21from experiment_status import ExperimentStatus
22from results_cache import CacheConditions
23from results_cache import ResultsCache
24from results_report import HTMLResultsReport
25from results_report import TextResultsReport
26from results_report import JSONResultsReport
27from schedv2 import Schedv2
28
29
30class ExperimentRunner(object):
31  """ExperimentRunner Class."""
32
33  STATUS_TIME_DELAY = 30
34  THREAD_MONITOR_DELAY = 2
35
36  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
37               cmd_exec=None):
38    self._experiment = experiment
39    self.l = log or logger.GetLogger(experiment.log_dir)
40    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
41    self._terminated = False
42    self.json_report = json_report
43    self.locked_machines = []
44    if experiment.log_level != "verbose":
45      self.STATUS_TIME_DELAY = 10
46
47    # Setting this to True will use crosperf sched v2 (feature in progress).
48    self._using_schedv2 = using_schedv2
49
50  def _GetMachineList(self):
51    """Return a list of all requested machines.
52
53    Create a list of all the requested machines, both global requests and
54    label-specific requests, and return the list.
55    """
56    machines = self._experiment.remote
57    # All Label.remote is a sublist of experiment.remote.
58    for l in self._experiment.labels:
59      for r in l.remote:
60        assert r in machines
61    return machines
62
63  def _UpdateMachineList(self, locked_machines):
64    """Update machines lists to contain only locked machines.
65
66    Go through all the lists of requested machines, both global and
67    label-specific requests, and remove any machine that we were not
68    able to lock.
69
70    Args:
71      locked_machines: A list of the machines we successfully locked.
72    """
73    for m in self._experiment.remote:
74      if m not in locked_machines:
75        self._experiment.remote.remove(m)
76
77    for l in self._experiment.labels:
78      for m in l.remote:
79        if m not in locked_machines:
80          l.remote.remove(m)
81
82  def _LockAllMachines(self, experiment):
83    """Attempt to globally lock all of the machines requested for run.
84
85    This method will use the AFE server to globally lock all of the machines
86    requested for this crosperf run, to prevent any other crosperf runs from
87    being able to update/use the machines while this experiment is running.
88    """
89    lock_mgr = afe_lock_machine.AFELockManager(
90        self._GetMachineList(),
91        "",
92        experiment.labels[0].chromeos_root,
93        None,
94        log=self.l,
95    )
96    for m in lock_mgr.machines:
97      if not lock_mgr.MachineIsKnown(m):
98        lock_mgr.AddLocalMachine(m)
99    machine_states = lock_mgr.GetMachineStates("lock")
100    lock_mgr.CheckMachineLocks(machine_states, "lock")
101    self.locked_machines = lock_mgr.UpdateMachines(True)
102    self._experiment.locked_machines = self.locked_machines
103    self._UpdateMachineList(self.locked_machines)
104    self._experiment.machine_manager.RemoveNonLockedMachines(
105        self.locked_machines)
106    if len(self.locked_machines) == 0:
107        raise RuntimeError("Unable to lock any machines.")
108
109  def _UnlockAllMachines(self, experiment):
110    """Attempt to globally unlock all of the machines requested for run.
111
112    The method will use the AFE server to globally unlock all of the machines
113    requested for this crosperf run.
114    """
115    if not self.locked_machines:
116        return
117
118    lock_mgr = afe_lock_machine.AFELockManager(
119        self.locked_machines,
120        "",
121        experiment.labels[0].chromeos_root,
122        None,
123        log=self.l,
124    )
125    machine_states = lock_mgr.GetMachineStates("unlock")
126    lock_mgr.CheckMachineLocks(machine_states, "unlock")
127    lock_mgr.UpdateMachines(False)
128
129  def _ClearCacheEntries(self, experiment):
130    for br in experiment.benchmark_runs:
131      cache = ResultsCache()
132      cache.Init (br.label.chromeos_image, br.label.chromeos_root,
133                  br.benchmark.test_name, br.iteration, br.test_args,
134                  br.profiler_args, br.machine_manager, br.machine,
135                  br.label.board, br.cache_conditions, br._logger, br.log_level,
136                  br.label, br.share_cache, br.benchmark.suite,
137                  br.benchmark.show_all_results, br.benchmark.run_local)
138      cache_dir = cache._GetCacheDirForWrite()
139      if os.path.exists(cache_dir):
140        self.l.LogOutput("Removing cache dir: %s" % cache_dir)
141        shutil.rmtree(cache_dir)
142
143  def _Run(self, experiment):
144    try:
145      if not experiment.locks_dir:
146        self._LockAllMachines(experiment)
147      if self._using_schedv2:
148        schedv2 = Schedv2(experiment)
149        experiment.set_schedv2(schedv2)
150      if CacheConditions.FALSE in experiment.cache_conditions:
151        self._ClearCacheEntries(experiment)
152      status = ExperimentStatus(experiment)
153      experiment.Run()
154      last_status_time = 0
155      last_status_string = ""
156      try:
157        if experiment.log_level != "verbose":
158          self.l.LogStartDots()
159        while not experiment.IsComplete():
160          if last_status_time + self.STATUS_TIME_DELAY < time.time():
161            last_status_time = time.time()
162            border = "=============================="
163            if experiment.log_level == "verbose":
164              self.l.LogOutput(border)
165              self.l.LogOutput(status.GetProgressString())
166              self.l.LogOutput(status.GetStatusString())
167              self.l.LogOutput(border)
168            else:
169              current_status_string = status.GetStatusString()
170              if (current_status_string != last_status_string):
171                self.l.LogEndDots()
172                self.l.LogOutput(border)
173                self.l.LogOutput(current_status_string)
174                self.l.LogOutput(border)
175                last_status_string = current_status_string
176              else:
177                self.l.LogAppendDot()
178          time.sleep(self.THREAD_MONITOR_DELAY)
179      except KeyboardInterrupt:
180        self._terminated = True
181        self.l.LogError("Ctrl-c pressed. Cleaning up...")
182        experiment.Terminate()
183    finally:
184      if not experiment.locks_dir:
185        self._UnlockAllMachines(experiment)
186
187  def _PrintTable(self, experiment):
188    self.l.LogOutput(TextResultsReport(experiment).GetReport())
189
190  def _Email(self, experiment):
191    # Only email by default if a new run was completed.
192    send_mail = False
193    for benchmark_run in experiment.benchmark_runs:
194      if not benchmark_run.cache_hit:
195        send_mail = True
196        break
197    if (not send_mail and not experiment.email_to
198        or config.GetConfig("no_email")):
199      return
200
201    label_names = []
202    for label in experiment.labels:
203      label_names.append(label.name)
204    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
205
206    text_report = TextResultsReport(experiment, True).GetReport()
207    text_report += ("\nResults are stored in %s.\n" %
208                    experiment.results_directory)
209    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
210    html_report = HTMLResultsReport(experiment).GetReport()
211    attachment = EmailSender.Attachment("report.html", html_report)
212    email_to = [getpass.getuser()] + experiment.email_to
213    EmailSender().SendEmail(email_to,
214                            subject,
215                            text_report,
216                            attachments=[attachment],
217                            msg_type="html")
218
219  def _StoreResults (self, experiment):
220    if self._terminated:
221      return
222    results_directory = experiment.results_directory
223    FileUtils().RmDir(results_directory)
224    FileUtils().MkDirP(results_directory)
225    self.l.LogOutput("Storing experiment file in %s." % results_directory)
226    experiment_file_path = os.path.join(results_directory,
227                                        "experiment.exp")
228    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
229
230    self.l.LogOutput("Storing results report in %s." % results_directory)
231    results_table_path = os.path.join(results_directory, "results.html")
232    report = HTMLResultsReport(experiment).GetReport()
233    if self.json_report:
234      JSONResultsReport(experiment).GetReport(results_directory)
235    FileUtils().WriteFile(results_table_path, report)
236
237    self.l.LogOutput("Storing email message body in %s." % results_directory)
238    msg_file_path = os.path.join(results_directory, "msg_body.html")
239    text_report = TextResultsReport(experiment, True).GetReport()
240    text_report += ("\nResults are stored in %s.\n" %
241                    experiment.results_directory)
242    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
243    FileUtils().WriteFile(msg_file_path, msg_body)
244
245    self.l.LogOutput("Storing results of each benchmark run.")
246    for benchmark_run in experiment.benchmark_runs:
247      if benchmark_run.result:
248        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
249        benchmark_run_path = os.path.join(results_directory,
250                                          benchmark_run_name)
251        benchmark_run.result.CopyResultsTo(benchmark_run_path)
252        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
253
254  def Run(self):
255    self._Run(self._experiment)
256    self._PrintTable(self._experiment)
257    if not self._terminated:
258      self._StoreResults(self._experiment)
259      self._Email(self._experiment)
260
261
262class MockExperimentRunner(ExperimentRunner):
263  """Mocked ExperimentRunner for testing."""
264
265  def __init__(self, experiment):
266    super(MockExperimentRunner, self).__init__(experiment)
267
268  def _Run(self, experiment):
269    self.l.LogOutput("Would run the following experiment: '%s'." %
270                     experiment.name)
271
272  def _PrintTable(self, experiment):
273    self.l.LogOutput("Would print the experiment table.")
274
275  def _Email(self, experiment):
276    self.l.LogOutput("Would send result email.")
277
278  def _StoreResults(self, experiment):
279    self.l.LogOutput("Would store the results.")
280