experiment_runner.py revision 6736a8e1044141d657aeac5916b32123da664e97
1# Copyright 2011-2015 Google Inc. All Rights Reserved.
2"""The experiment runner module."""
3import getpass
4import os
5import shutil
6import time
7
8import afe_lock_machine
9import test_flag
10
11from cros_utils import command_executer
12from cros_utils import logger
13from cros_utils.email_sender import EmailSender
14from cros_utils.file_utils import FileUtils
15
16import config
17from experiment_status import ExperimentStatus
18from results_cache import CacheConditions
19from results_cache import ResultsCache
20from results_report import HTMLResultsReport
21from results_report import TextResultsReport
22from results_report import JSONResultsReport
23from schedv2 import Schedv2
24
25
26class ExperimentRunner(object):
27  """ExperimentRunner Class."""
28
29  STATUS_TIME_DELAY = 30
30  THREAD_MONITOR_DELAY = 2
31
32  def __init__(self,
33               experiment,
34               json_report,
35               using_schedv2=False,
36               log=None,
37               cmd_exec=None):
38    self._experiment = experiment
39    self.l = log or logger.GetLogger(experiment.log_dir)
40    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
41    self._terminated = False
42    self.json_report = json_report
43    self.locked_machines = []
44    if experiment.log_level != 'verbose':
45      self.STATUS_TIME_DELAY = 10
46
47    # Setting this to True will use crosperf sched v2 (feature in progress).
48    self._using_schedv2 = using_schedv2
49
50  def _GetMachineList(self):
51    """Return a list of all requested machines.
52
53    Create a list of all the requested machines, both global requests and
54    label-specific requests, and return the list.
55    """
56    machines = self._experiment.remote
57    # All Label.remote is a sublist of experiment.remote.
58    for l in self._experiment.labels:
59      for r in l.remote:
60        assert r in machines
61    return machines
62
63  def _UpdateMachineList(self, locked_machines):
64    """Update machines lists to contain only locked machines.
65
66    Go through all the lists of requested machines, both global and
67    label-specific requests, and remove any machine that we were not
68    able to lock.
69
70    Args:
71      locked_machines: A list of the machines we successfully locked.
72    """
73    for m in self._experiment.remote:
74      if m not in locked_machines:
75        self._experiment.remote.remove(m)
76
77    for l in self._experiment.labels:
78      for m in l.remote:
79        if m not in locked_machines:
80          l.remote.remove(m)
81
82  def _LockAllMachines(self, experiment):
83    """Attempt to globally lock all of the machines requested for run.
84
85    This method will use the AFE server to globally lock all of the machines
86    requested for this crosperf run, to prevent any other crosperf runs from
87    being able to update/use the machines while this experiment is running.
88    """
89    if test_flag.GetTestMode():
90      self.locked_machines = self._GetMachineList()
91      self._experiment.locked_machines = self.locked_machines
92    else:
93      lock_mgr = afe_lock_machine.AFELockManager(
94          self._GetMachineList(),
95          '',
96          experiment.labels[0].chromeos_root,
97          None,
98          log=self.l,)
99      for m in lock_mgr.machines:
100        if not lock_mgr.MachineIsKnown(m):
101          lock_mgr.AddLocalMachine(m)
102      machine_states = lock_mgr.GetMachineStates('lock')
103      lock_mgr.CheckMachineLocks(machine_states, 'lock')
104      self.locked_machines = lock_mgr.UpdateMachines(True)
105      self._experiment.locked_machines = self.locked_machines
106      self._UpdateMachineList(self.locked_machines)
107      self._experiment.machine_manager.RemoveNonLockedMachines(
108          self.locked_machines)
109      if len(self.locked_machines) == 0:
110        raise RuntimeError('Unable to lock any machines.')
111
112  def _UnlockAllMachines(self, experiment):
113    """Attempt to globally unlock all of the machines requested for run.
114
115    The method will use the AFE server to globally unlock all of the machines
116    requested for this crosperf run.
117    """
118    if not self.locked_machines or test_flag.GetTestMode():
119      return
120
121    lock_mgr = afe_lock_machine.AFELockManager(
122        self.locked_machines,
123        '',
124        experiment.labels[0].chromeos_root,
125        None,
126        log=self.l,)
127    machine_states = lock_mgr.GetMachineStates('unlock')
128    lock_mgr.CheckMachineLocks(machine_states, 'unlock')
129    lock_mgr.UpdateMachines(False)
130
131  def _ClearCacheEntries(self, experiment):
132    for br in experiment.benchmark_runs:
133      cache = ResultsCache()
134      cache.Init(br.label.chromeos_image, br.label.chromeos_root,
135                 br.benchmark.test_name, br.iteration, br.test_args,
136                 br.profiler_args, br.machine_manager, br.machine,
137                 br.label.board, br.cache_conditions, br._logger, br.log_level,
138                 br.label, br.share_cache, br.benchmark.suite,
139                 br.benchmark.show_all_results, br.benchmark.run_local)
140      cache_dir = cache.GetCacheDirForWrite()
141      if os.path.exists(cache_dir):
142        self.l.LogOutput('Removing cache dir: %s' % cache_dir)
143        shutil.rmtree(cache_dir)
144
145  def _Run(self, experiment):
146    try:
147      if not experiment.locks_dir:
148        self._LockAllMachines(experiment)
149      if self._using_schedv2:
150        schedv2 = Schedv2(experiment)
151        experiment.set_schedv2(schedv2)
152      if CacheConditions.FALSE in experiment.cache_conditions:
153        self._ClearCacheEntries(experiment)
154      status = ExperimentStatus(experiment)
155      experiment.Run()
156      last_status_time = 0
157      last_status_string = ''
158      try:
159        if experiment.log_level != 'verbose':
160          self.l.LogStartDots()
161        while not experiment.IsComplete():
162          if last_status_time + self.STATUS_TIME_DELAY < time.time():
163            last_status_time = time.time()
164            border = '=============================='
165            if experiment.log_level == 'verbose':
166              self.l.LogOutput(border)
167              self.l.LogOutput(status.GetProgressString())
168              self.l.LogOutput(status.GetStatusString())
169              self.l.LogOutput(border)
170            else:
171              current_status_string = status.GetStatusString()
172              if current_status_string != last_status_string:
173                self.l.LogEndDots()
174                self.l.LogOutput(border)
175                self.l.LogOutput(current_status_string)
176                self.l.LogOutput(border)
177                last_status_string = current_status_string
178              else:
179                self.l.LogAppendDot()
180          time.sleep(self.THREAD_MONITOR_DELAY)
181      except KeyboardInterrupt:
182        self._terminated = True
183        self.l.LogError('Ctrl-c pressed. Cleaning up...')
184        experiment.Terminate()
185        raise
186      except SystemExit:
187        self._terminated = True
188        self.l.LogError('Unexpected exit. Cleaning up...')
189        experiment.Terminate()
190        raise
191    finally:
192      if not experiment.locks_dir:
193        self._UnlockAllMachines(experiment)
194
195  def _PrintTable(self, experiment):
196    self.l.LogOutput(TextResultsReport(experiment).GetReport())
197
198  def _Email(self, experiment):
199    # Only email by default if a new run was completed.
200    send_mail = False
201    for benchmark_run in experiment.benchmark_runs:
202      if not benchmark_run.cache_hit:
203        send_mail = True
204        break
205    if (not send_mail and not experiment.email_to or
206        config.GetConfig('no_email')):
207      return
208
209    label_names = []
210    for label in experiment.labels:
211      label_names.append(label.name)
212    subject = '%s: %s' % (experiment.name, ' vs. '.join(label_names))
213
214    text_report = TextResultsReport(experiment, True).GetReport()
215    text_report += ('\nResults are stored in %s.\n' %
216                    experiment.results_directory)
217    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
218    html_report = HTMLResultsReport(experiment).GetReport()
219    attachment = EmailSender.Attachment('report.html', html_report)
220    email_to = experiment.email_to or []
221    email_to.append(getpass.getuser())
222    EmailSender().SendEmail(email_to,
223                            subject,
224                            text_report,
225                            attachments=[attachment],
226                            msg_type='html')
227
228  def _StoreResults(self, experiment):
229    if self._terminated:
230      return
231    results_directory = experiment.results_directory
232    FileUtils().RmDir(results_directory)
233    FileUtils().MkDirP(results_directory)
234    self.l.LogOutput('Storing experiment file in %s.' % results_directory)
235    experiment_file_path = os.path.join(results_directory, 'experiment.exp')
236    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
237
238    self.l.LogOutput('Storing results report in %s.' % results_directory)
239    results_table_path = os.path.join(results_directory, 'results.html')
240    report = HTMLResultsReport(experiment).GetReport()
241    if self.json_report:
242      JSONResultsReport(experiment).GetReport(results_directory)
243    FileUtils().WriteFile(results_table_path, report)
244
245    self.l.LogOutput('Storing email message body in %s.' % results_directory)
246    msg_file_path = os.path.join(results_directory, 'msg_body.html')
247    text_report = TextResultsReport(experiment, True).GetReport()
248    text_report += ('\nResults are stored in %s.\n' %
249                    experiment.results_directory)
250    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
251    FileUtils().WriteFile(msg_file_path, msg_body)
252
253    self.l.LogOutput('Storing results of each benchmark run.')
254    for benchmark_run in experiment.benchmark_runs:
255      if benchmark_run.result:
256        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
257        benchmark_run_path = os.path.join(results_directory, benchmark_run_name)
258        benchmark_run.result.CopyResultsTo(benchmark_run_path)
259        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
260
261  def Run(self):
262    try:
263      self._Run(self._experiment)
264    finally:
265      # Always print the report at the end of the run.
266      self._PrintTable(self._experiment)
267      if not self._terminated:
268        self._StoreResults(self._experiment)
269        self._Email(self._experiment)
270
271
272class MockExperimentRunner(ExperimentRunner):
273  """Mocked ExperimentRunner for testing."""
274
275  def __init__(self, experiment, json_report):
276    super(MockExperimentRunner, self).__init__(experiment, json_report)
277
278  def _Run(self, experiment):
279    self.l.LogOutput("Would run the following experiment: '%s'." %
280                     experiment.name)
281
282  def _PrintTable(self, experiment):
283    self.l.LogOutput('Would print the experiment table.')
284
285  def _Email(self, experiment):
286    self.l.LogOutput('Would send result email.')
287
288  def _StoreResults(self, experiment):
289    self.l.LogOutput('Would store the results.')
290