experiment_runner.py revision 45b53c5fee4c044cebd74dbbbd6119fbd51554e7
1#!/usr/bin/python
2
3# Copyright 2011-2015 Google Inc. All Rights Reserved.
4
5"""The experiment runner module."""
6import getpass
7import os
8import random
9import shutil
10import sys
11import time
12
13import afe_lock_machine
14
15from utils import command_executer
16from utils import logger
17from utils.email_sender import EmailSender
18from utils.file_utils import FileUtils
19
20import config
21from experiment_status import ExperimentStatus
22from results_cache import CacheConditions
23from results_cache import ResultsCache
24from results_report import HTMLResultsReport
25from results_report import TextResultsReport
26from results_report import JSONResultsReport
27from schedv2 import Schedv2
28
29
30class ExperimentRunner(object):
31  """ExperimentRunner Class."""
32
33  STATUS_TIME_DELAY = 30
34  THREAD_MONITOR_DELAY = 2
35
36  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
37               cmd_exec=None):
38    self._experiment = experiment
39    self.l = log or logger.GetLogger(experiment.log_dir)
40    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
41    self._terminated = False
42    self.json_report = json_report
43    self.locked_machines = []
44    if experiment.log_level != "verbose":
45      self.STATUS_TIME_DELAY = 10
46
47    # Setting this to True will use crosperf sched v2 (feature in progress).
48    self._using_schedv2 = using_schedv2
49
50  def _GetMachineList(self):
51    """Return a list of all requested machines.
52
53    Create a list of all the requested machines, both global requests and
54    label-specific requests, and return the list.
55    """
56    machines = self._experiment.remote
57    # All Label.remote is a sublist of experiment.remote.
58    for l in self._experiment.labels:
59      for r in l.remote:
60        assert r in machines
61    return machines
62
63  def _UpdateMachineList(self, locked_machines):
64    """Update machines lists to contain only locked machines.
65
66    Go through all the lists of requested machines, both global and
67    label-specific requests, and remove any machine that we were not
68    able to lock.
69
70    Args:
71      locked_machines: A list of the machines we successfully locked.
72    """
73    for m in self._experiment.remote:
74      if m not in locked_machines:
75        self._experiment.remote.remove(m)
76
77    for l in self._experiment.labels:
78      for m in l.remote:
79        if m not in locked_machines:
80          l.remote.remove(m)
81
82  def _LockAllMachines(self, experiment):
83    """Attempt to globally lock all of the machines requested for run.
84
85    This method will use the AFE server to globally lock all of the machines
86    requested for this crosperf run, to prevent any other crosperf runs from
87    being able to update/use the machines while this experiment is running.
88    """
89    lock_mgr = afe_lock_machine.AFELockManager(
90        self._GetMachineList(),
91        "",
92        experiment.labels[0].chromeos_root,
93        None,
94        log=self.l,
95    )
96    for m in lock_mgr.machines:
97      if not lock_mgr.MachineIsKnown(m):
98        lock_mgr.AddLocalMachine(m)
99    machine_states = lock_mgr.GetMachineStates("lock")
100    lock_mgr.CheckMachineLocks(machine_states, "lock")
101    self.locked_machines = lock_mgr.UpdateMachines(True)
102    self._experiment.locked_machines = self.locked_machines
103    self._UpdateMachineList(self.locked_machines)
104    self._experiment.machine_manager.RemoveNonLockedMachines(
105        self.locked_machines)
106    if len(self.locked_machines) == 0:
107        raise RuntimeError("Unable to lock any machines.")
108
109  def _UnlockAllMachines(self, experiment):
110    """Attempt to globally unlock all of the machines requested for run.
111
112    The method will use the AFE server to globally unlock all of the machines
113    requested for this crosperf run.
114    """
115    if not self.locked_machines:
116        return
117
118    lock_mgr = afe_lock_machine.AFELockManager(
119        self.locked_machines,
120        "",
121        experiment.labels[0].chromeos_root,
122        None,
123        log=self.l,
124    )
125    machine_states = lock_mgr.GetMachineStates("unlock")
126    lock_mgr.CheckMachineLocks(machine_states, "unlock")
127    lock_mgr.UpdateMachines(False)
128
129  def _ClearCacheEntries(self, experiment):
130    for br in experiment.benchmark_runs:
131      cache = ResultsCache()
132      cache.Init (br.label.chromeos_image, br.label.chromeos_root,
133                  br.benchmark.test_name, br.iteration, br.test_args,
134                  br.profiler_args, br.machine_manager, br.machine,
135                  br.label.board, br.cache_conditions, br._logger, br.log_level,
136                  br.label, br.share_cache, br.benchmark.suite,
137                  br.benchmark.show_all_results, br.benchmark.run_local)
138      cache_dir = cache._GetCacheDirForWrite()
139      if os.path.exists(cache_dir):
140        self.l.LogOutput("Removing cache dir: %s" % cache_dir)
141        shutil.rmtree(cache_dir)
142
143  def _Run(self, experiment):
144    try:
145      if not experiment.locks_dir:
146        self._LockAllMachines(experiment)
147      if self._using_schedv2:
148        schedv2 = Schedv2(experiment)
149        experiment.set_schedv2(schedv2)
150      if CacheConditions.FALSE in experiment.cache_conditions:
151        self._ClearCacheEntries(experiment)
152      status = ExperimentStatus(experiment)
153      experiment.Run()
154      last_status_time = 0
155      last_status_string = ""
156      try:
157        if experiment.log_level != "verbose":
158          self.l.LogStartDots()
159        while not experiment.IsComplete():
160          if last_status_time + self.STATUS_TIME_DELAY < time.time():
161            last_status_time = time.time()
162            border = "=============================="
163            if experiment.log_level == "verbose":
164              self.l.LogOutput(border)
165              self.l.LogOutput(status.GetProgressString())
166              self.l.LogOutput(status.GetStatusString())
167              self.l.LogOutput(border)
168            else:
169              current_status_string = status.GetStatusString()
170              if (current_status_string != last_status_string):
171                self.l.LogEndDots()
172                self.l.LogOutput(border)
173                self.l.LogOutput(current_status_string)
174                self.l.LogOutput(border)
175                last_status_string = current_status_string
176              else:
177                self.l.LogAppendDot()
178          time.sleep(self.THREAD_MONITOR_DELAY)
179      except KeyboardInterrupt:
180        self._terminated = True
181        self.l.LogError("Ctrl-c pressed. Cleaning up...")
182        experiment.Terminate()
183        raise
184      except SystemExit:
185        self._terminate = True
186        self.l.LogError("Unexpected exit. Cleaning up...")
187        experiment.Terminate()
188        raise
189    finally:
190      if not experiment.locks_dir:
191        self._UnlockAllMachines(experiment)
192
193  def _PrintTable(self, experiment):
194    self.l.LogOutput(TextResultsReport(experiment).GetReport())
195
196  def _Email(self, experiment):
197    # Only email by default if a new run was completed.
198    send_mail = False
199    for benchmark_run in experiment.benchmark_runs:
200      if not benchmark_run.cache_hit:
201        send_mail = True
202        break
203    if (not send_mail and not experiment.email_to
204        or config.GetConfig("no_email")):
205      return
206
207    label_names = []
208    for label in experiment.labels:
209      label_names.append(label.name)
210    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
211
212    text_report = TextResultsReport(experiment, True).GetReport()
213    text_report += ("\nResults are stored in %s.\n" %
214                    experiment.results_directory)
215    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
216    html_report = HTMLResultsReport(experiment).GetReport()
217    attachment = EmailSender.Attachment("report.html", html_report)
218    email_to = [getpass.getuser()] + experiment.email_to
219    EmailSender().SendEmail(email_to,
220                            subject,
221                            text_report,
222                            attachments=[attachment],
223                            msg_type="html")
224
225  def _StoreResults (self, experiment):
226    if self._terminated:
227      return
228    results_directory = experiment.results_directory
229    FileUtils().RmDir(results_directory)
230    FileUtils().MkDirP(results_directory)
231    self.l.LogOutput("Storing experiment file in %s." % results_directory)
232    experiment_file_path = os.path.join(results_directory,
233                                        "experiment.exp")
234    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
235
236    self.l.LogOutput("Storing results report in %s." % results_directory)
237    results_table_path = os.path.join(results_directory, "results.html")
238    report = HTMLResultsReport(experiment).GetReport()
239    if self.json_report:
240      JSONResultsReport(experiment).GetReport(results_directory)
241    FileUtils().WriteFile(results_table_path, report)
242
243    self.l.LogOutput("Storing email message body in %s." % results_directory)
244    msg_file_path = os.path.join(results_directory, "msg_body.html")
245    text_report = TextResultsReport(experiment, True).GetReport()
246    text_report += ("\nResults are stored in %s.\n" %
247                    experiment.results_directory)
248    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
249    FileUtils().WriteFile(msg_file_path, msg_body)
250
251    self.l.LogOutput("Storing results of each benchmark run.")
252    for benchmark_run in experiment.benchmark_runs:
253      if benchmark_run.result:
254        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
255        benchmark_run_path = os.path.join(results_directory,
256                                          benchmark_run_name)
257        benchmark_run.result.CopyResultsTo(benchmark_run_path)
258        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
259
260  def Run(self):
261    try:
262      self._Run(self._experiment)
263    finally:
264      # Always print the report at the end of the run.
265      self._PrintTable(self._experiment)
266      if not self._terminated:
267        self._StoreResults(self._experiment)
268        self._Email(self._experiment)
269
270
271class MockExperimentRunner(ExperimentRunner):
272  """Mocked ExperimentRunner for testing."""
273
274  def __init__(self, experiment):
275    super(MockExperimentRunner, self).__init__(experiment)
276
277  def _Run(self, experiment):
278    self.l.LogOutput("Would run the following experiment: '%s'." %
279                     experiment.name)
280
281  def _PrintTable(self, experiment):
282    self.l.LogOutput("Would print the experiment table.")
283
284  def _Email(self, experiment):
285    self.l.LogOutput("Would send result email.")
286
287  def _StoreResults(self, experiment):
288    self.l.LogOutput("Would store the results.")
289