experiment_runner.py revision c87cb38b44bdaba67d1415386fcc7ff6a97e3365
1#!/usr/bin/python
2
3# Copyright 2011 Google Inc. All Rights Reserved.
4
5"""The experiment runner module."""
6import getpass
7import os
8import random
9import shutil
10import sys
11import time
12import traceback
13
14import afe_lock_machine
15from machine_image_manager import MachineImageManager
16
17from collections import defaultdict
18from utils import command_executer
19from utils import logger
20from utils.email_sender import EmailSender
21from utils.file_utils import FileUtils
22from threading import Lock
23from threading import Thread
24
25import config
26from experiment_status import ExperimentStatus
27from results_cache import CacheConditions
28from results_cache import ResultsCache
29from results_report import HTMLResultsReport
30from results_report import TextResultsReport
31from results_report import JSONResultsReport
32
33
34class ExperimentRunner(object):
35  """ExperimentRunner Class."""
36
37  STATUS_TIME_DELAY = 30
38  THREAD_MONITOR_DELAY = 2
39
40  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
41               cmd_exec=None):
42    self._experiment = experiment
43    self.l = log or logger.GetLogger(experiment.log_dir)
44    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
45    self._terminated = False
46    self.json_report = json_report
47    self.locked_machines = []
48    if experiment.log_level != "verbose":
49      self.STATUS_TIME_DELAY = 10
50
51    # Setting this to True will use crosperf sched v2 (feature in progress).
52    self._using_schedv2 = using_schedv2
53
54  def _GetMachineList(self):
55    """Return a list of all requested machines.
56
57    Create a list of all the requested machines, both global requests and
58    label-specific requests, and return the list.
59    """
60    machines = self._experiment.remote
61    for l in self._experiment.labels:
62      if l.remote:
63        machines += l.remote
64    return machines
65
66  def _UpdateMachineList(self, locked_machines):
67    """Update machines lists to contain only locked machines.
68
69    Go through all the lists of requested machines, both global and
70    label-specific requests, and remove any machine that we were not
71    able to lock.
72
73    Args:
74      locked_machines: A list of the machines we successfully locked.
75    """
76    for m in self._experiment.remote:
77      if m not in locked_machines:
78        self._experiment.remote.remove(m)
79
80    for l in self._experiment.labels:
81      for m in l.remote:
82        if m not in locked_machines:
83          l.remote.remove(m)
84
85  def _LockAllMachines(self, experiment):
86    """Attempt to globally lock all of the machines requested for run.
87
88    This method will use the AFE server to globally lock all of the machines
89    requested for this crosperf run, to prevent any other crosperf runs from
90    being able to update/use the machines while this experiment is running.
91    """
92    lock_mgr = afe_lock_machine.AFELockManager(
93        self._GetMachineList(),
94        "",
95        experiment.labels[0].chromeos_root,
96        None,
97        log=self.l,
98    )
99    for m in lock_mgr.machines:
100      if not lock_mgr.MachineIsKnown(m):
101        lock_mgr.AddLocalMachine(m)
102    machine_states = lock_mgr.GetMachineStates("lock")
103    lock_mgr.CheckMachineLocks(machine_states, "lock")
104    self.locked_machines = lock_mgr.UpdateMachines(True)
105    self._experiment.locked_machines = self.locked_machines
106    self._UpdateMachineList(self.locked_machines)
107    self._experiment.machine_manager.RemoveNonLockedMachines(
108        self.locked_machines)
109    if len(self.locked_machines) == 0:
110        raise RuntimeError("Unable to lock any machines.")
111
112  def _UnlockAllMachines(self, experiment):
113    """Attempt to globally unlock all of the machines requested for run.
114
115    The method will use the AFE server to globally unlock all of the machines
116    requested for this crosperf run.
117    """
118    if not self.locked_machines:
119        return
120
121    lock_mgr = afe_lock_machine.AFELockManager(
122        self.locked_machines,
123        "",
124        experiment.labels[0].chromeos_root,
125        None,
126        log=self.l,
127    )
128    machine_states = lock_mgr.GetMachineStates("unlock")
129    lock_mgr.CheckMachineLocks(machine_states, "unlock")
130    lock_mgr.UpdateMachines(False)
131
132  def _ClearCacheEntries(self, experiment):
133    for br in experiment.benchmark_runs:
134      cache = ResultsCache()
135      cache.Init (br.label.chromeos_image, br.label.chromeos_root,
136                  br.benchmark.test_name, br.iteration, br.test_args,
137                  br.profiler_args, br.machine_manager, br.machine,
138                  br.label.board, br.cache_conditions, br._logger, br.log_level,
139                  br.label, br.share_cache, br.benchmark.suite,
140                  br.benchmark.show_all_results, br.benchmark.run_local)
141      cache_dir = cache._GetCacheDirForWrite()
142      self.l.LogOutput("Removing cache dir: %s" % cache_dir)
143      shutil.rmtree(cache_dir)
144
145  def _Run(self, experiment):
146    try:
147      if not experiment.locks_dir:
148        self._LockAllMachines(experiment)
149      if self._using_schedv2:
150        schedv2 = Schedv2(experiment)
151        experiment.set_schedv2(schedv2)
152      if CacheConditions.FALSE in experiment.cache_conditions:
153        self._ClearCacheEntries(experiment)
154      status = ExperimentStatus(experiment)
155      experiment.Run()
156      last_status_time = 0
157      last_status_string = ""
158      try:
159        if experiment.log_level != "verbose":
160          self.l.LogStartDots()
161        while not experiment.IsComplete():
162          if last_status_time + self.STATUS_TIME_DELAY < time.time():
163            last_status_time = time.time()
164            border = "=============================="
165            if experiment.log_level == "verbose":
166              self.l.LogOutput(border)
167              self.l.LogOutput(status.GetProgressString())
168              self.l.LogOutput(status.GetStatusString())
169              self.l.LogOutput(border)
170            else:
171              current_status_string = status.GetStatusString()
172              if (current_status_string != last_status_string):
173                self.l.LogEndDots()
174                self.l.LogOutput(border)
175                self.l.LogOutput(current_status_string)
176                self.l.LogOutput(border)
177                last_status_string = current_status_string
178              else:
179                self.l.LogAppendDot()
180          time.sleep(self.THREAD_MONITOR_DELAY)
181      except KeyboardInterrupt:
182        self._terminated = True
183        self.l.LogError("Ctrl-c pressed. Cleaning up...")
184        experiment.Terminate()
185    finally:
186      if not experiment.locks_dir:
187        self._UnlockAllMachines(experiment)
188
189  def _PrintTable(self, experiment):
190    self.l.LogOutput(TextResultsReport(experiment).GetReport())
191
192  def _Email(self, experiment):
193    # Only email by default if a new run was completed.
194    send_mail = False
195    for benchmark_run in experiment.benchmark_runs:
196      if not benchmark_run.cache_hit:
197        send_mail = True
198        break
199    if (not send_mail and not experiment.email_to
200        or config.GetConfig("no_email")):
201      return
202
203    label_names = []
204    for label in experiment.labels:
205      label_names.append(label.name)
206    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
207
208    text_report = TextResultsReport(experiment, True).GetReport()
209    text_report += ("\nResults are stored in %s.\n" %
210                    experiment.results_directory)
211    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
212    html_report = HTMLResultsReport(experiment).GetReport()
213    attachment = EmailSender.Attachment("report.html", html_report)
214    email_to = [getpass.getuser()] + experiment.email_to
215    EmailSender().SendEmail(email_to,
216                            subject,
217                            text_report,
218                            attachments=[attachment],
219                            msg_type="html")
220
221  def _StoreResults (self, experiment):
222    if self._terminated:
223      return
224    results_directory = experiment.results_directory
225    FileUtils().RmDir(results_directory)
226    FileUtils().MkDirP(results_directory)
227    self.l.LogOutput("Storing experiment file in %s." % results_directory)
228    experiment_file_path = os.path.join(results_directory,
229                                        "experiment.exp")
230    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
231
232    self.l.LogOutput("Storing results report in %s." % results_directory)
233    results_table_path = os.path.join(results_directory, "results.html")
234    report = HTMLResultsReport(experiment).GetReport()
235    if self.json_report:
236      JSONResultsReport(experiment).GetReport(results_directory)
237    FileUtils().WriteFile(results_table_path, report)
238
239    self.l.LogOutput("Storing email message body in %s." % results_directory)
240    msg_file_path = os.path.join(results_directory, "msg_body.html")
241    text_report = TextResultsReport(experiment, True).GetReport()
242    text_report += ("\nResults are stored in %s.\n" %
243                    experiment.results_directory)
244    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
245    FileUtils().WriteFile(msg_file_path, msg_body)
246
247    self.l.LogOutput("Storing results of each benchmark run.")
248    for benchmark_run in experiment.benchmark_runs:
249      if benchmark_run.result:
250        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
251        benchmark_run_path = os.path.join(results_directory,
252                                          benchmark_run_name)
253        benchmark_run.result.CopyResultsTo(benchmark_run_path)
254        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
255
256  def Run(self):
257    self._Run(self._experiment)
258    self._PrintTable(self._experiment)
259    if not self._terminated:
260      self._StoreResults(self._experiment)
261      self._Email(self._experiment)
262
263class DutWorker(Thread):
264
265    def __init__(self, dut, sched):
266        super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
267        self._dut = dut
268        self._sched = sched
269        self._stat_num_br_run = 0
270        self._stat_num_reimage = 0
271        self._stat_annotation = ""
272        self._l = logger.GetLogger(self._sched._experiment.log_dir)
273        self.daemon = True
274        self._terminated = False
275        self._active_br = None
276        # Race condition accessing _active_br between _execute_benchmark_run and
277        # _terminate, so lock it up.
278        self._active_br_lock = Lock()
279
280    def terminate(self):
281        self._terminated = True
282        with self._active_br_lock:
283            if self._active_br is not None:
284                # BenchmarkRun.Terminate() terminates any running testcase via
285                # suite_runner.Terminate and updates timeline.
286                self._active_br.Terminate()
287
288    def run(self):
289        """Do the "run-test->(optionally reimage)->run-test" chore.
290
291        Note - 'br' below means 'benchmark_run'.
292        """
293
294        self._setup_dut_label()
295        try:
296            self._l.LogOutput("{} started.".format(self))
297            while not self._terminated:
298                br = self._sched.get_benchmark_run(self._dut)
299                if br is None:
300                    # No br left for this label. Considering reimaging.
301                    label = self._sched.allocate_label(self._dut)
302                    if label is None:
303                        # No br even for other labels. We are done.
304                        self._l.LogOutput("ImageManager found no label "
305                                          "for dut, stopping working "
306                                          "thread {}.".format(self))
307                        break
308                    if self._reimage(label):
309                        # Reimage to run other br fails, dut is doomed, stop
310                        # this thread.
311                        self._l.LogWarning("Re-image failed, dut "
312                                           "in an unstable state, stopping "
313                                           "working thread {}.".format(self))
314                        break
315                else:
316                    # Execute the br.
317                    self._execute_benchmark_run(br)
318        finally:
319            self._stat_annotation = "finished"
320            # Thread finishes. Notify scheduler that I'm done.
321            self._sched.dut_worker_finished(self)
322
323    def _reimage(self, label):
324        """Reimage image to label.
325
326        Args:
327          label: the label to remimage onto dut.
328
329        Returns:
330          0 if successful, otherwise 1.
331        """
332
333        # Termination could happen anywhere, check it.
334        if self._terminated:
335            return 1
336
337        self._l.LogOutput('Reimaging {} using {}'.format(self, label))
338        self._stat_num_reimage += 1
339        self._stat_annotation = 'reimaging using "{}"'.format(label.name)
340        try:
341            # Note, only 1 reimage at any given time, this is guaranteed in
342            # ImageMachine, so no sync needed below.
343            retval = self._sched._experiment.machine_manager.ImageMachine(
344                self._dut, label)
345            if retval:
346                return 1
347        except:
348            return 1
349
350        self._dut.label = label
351        return 0
352
353    def _execute_benchmark_run(self, br):
354        """Execute a single benchmark_run.
355
356        Note - this function never throws exceptions.
357        """
358
359        # Termination could happen anywhere, check it.
360        if self._terminated:
361            return
362
363        self._l.LogOutput('{} started working on {}'.format(self, br))
364        self._stat_num_br_run += 1
365        self._stat_annotation = 'executing {}'.format(br)
366        # benchmark_run.run does not throws, but just play it safe here.
367        try:
368            assert br.owner_thread is None
369            br.owner_thread = self
370            with self._active_br_lock:
371                self._active_br = br
372            br.run()
373        finally:
374            self._sched._experiment.BenchmarkRunFinished(br)
375            with self._active_br_lock:
376                self._active_br = None
377
378    def _setup_dut_label(self):
379        """Try to match dut image with a certain experiment label.
380
381        If such match is found, we just skip doing reimage and jump to execute
382        some benchmark_runs.
383        """
384
385        checksum_file = "/usr/local/osimage_checksum_file"
386        try:
387            rv, checksum, _ = command_executer.GetCommandExecuter().\
388                CrosRunCommand(
389                    "cat " + checksum_file,
390                    return_output=True,
391                    chromeos_root=self._sched._labels[0].chromeos_root,
392                    machine=self._dut.name)
393            if rv == 0:
394                checksum = checksum.strip()
395                for l in self._sched._labels:
396                    if l.checksum == checksum:
397                        self._l.LogOutput(
398                            "Dut '{}' is pre-installed with '{}'".format(
399                                self._dut.name, l))
400                        self._dut.label = l
401                        return
402        except:
403            traceback.print_exc(file=sys.stdout)
404            self._dut.label = None
405
406    def __str__(self):
407        return 'DutWorker[dut="{}", label="{}"]'.format(
408            self._dut.name, self._dut.label.name if self._dut.label else "None")
409
410    def dut(self):
411        return self._dut
412
413    def status_str(self):
414      """Report thread status."""
415
416      return ('Worker thread "{}", label="{}", benchmark_run={}, '
417              'reimage={}, now {}'.format(
418                self._dut.name,
419                'None' if self._dut.label is None else self._dut.label.name,
420                self._stat_num_br_run,
421                self._stat_num_reimage,
422                self._stat_annotation))
423
424
425class Schedv2(object):
426    """New scheduler for crosperf."""
427
428    def __init__(self, experiment):
429        self._experiment = experiment
430        self._l = logger.GetLogger(experiment.log_dir)
431
432        # Create shortcuts to nested data structure. "_duts" points to a list of
433        # locked machines. _labels points to a list of all labels.
434        self._duts = self._experiment.machine_manager._all_machines
435        self._labels = self._experiment.labels
436
437        # Mapping from label to a list of benchmark_runs.
438        self._label_brl_map = dict([(l, []) for l in self._labels])
439        for br in self._experiment.benchmark_runs:
440            assert br.label in self._label_brl_map
441            self._label_brl_map[br.label].append(br)
442
443        # Use machine image manager to calculate initial label allocation.
444        self._mim = MachineImageManager(self._labels, self._duts)
445        self._mim.compute_initial_allocation()
446
447        # Create worker thread, 1 per dut.
448        self._active_workers = [DutWorker(dut, self) for dut in self._duts]
449        self._finished_workers = []
450
451        # Bookkeeping for synchronization.
452        self._workers_lock = Lock()
453        self._lock_map = defaultdict(lambda: Lock())
454
455        # Termination flag.
456        self._terminated = False
457
458    def run_sched(self):
459        """Start all dut worker threads and return immediately."""
460        [w.start() for w in self._active_workers]
461
462    def get_benchmark_run(self, dut):
463        """Get a benchmark_run (br) object for a certain dut.
464
465        Arguments:
466          dut: the dut for which a br is returned.
467
468        Returns:
469          A br with its label matching that of the dut. If no such br could be
470          found, return None (this usually means a reimage is required for the
471          dut).
472        """
473
474        # If terminated, stop providing any br.
475        if self._terminated:
476            return None
477
478        # If dut bears an unrecognized label, return None.
479        if dut.label is None:
480            return None
481
482        # If br list for the dut's label is empty (that means all brs for this
483        # label have been done) , return None.
484        with self._lock_on(dut.label):
485            brl = self._label_brl_map[dut.label]
486            if not brl:
487                return None
488            # Return the first br.
489            return brl.pop(0)
490
491    def allocate_label(self, dut):
492        """Allocate a label to a dut.
493
494        The work is delegated to MachineImageManager.
495
496        The dut_worker calling this method is responsible for reimage the dut to
497        this label.
498
499        Arguments:
500          dut: the new label that is to be reimaged onto the dut.
501
502        Returns:
503          The label or None.
504        """
505
506        if self._terminated:
507            return None
508
509        return self._mim.allocate(dut, self)
510
511    def dut_worker_finished(self, dut_worker):
512        """Notify schedv2 that the dut_worker thread finished.
513
514        Arguemnts:
515          dut_worker: the thread that is about to end."""
516
517        self._l.LogOutput("{} finished.".format(dut_worker))
518        with self._workers_lock:
519            self._active_workers.remove(dut_worker)
520            self._finished_workers.append(dut_worker)
521
522    def is_complete(self):
523      return len(self._active_workers) == 0
524
525    def _lock_on(self, object):
526        return self._lock_map[object]
527
528    def terminate(self):
529        """Mark flag so we stop providing br/reimages.
530
531        Also terminate each DutWorker, so they refuse to execute br or reimage.
532        """
533
534        self._terminated = True
535        for dut_worker in self._active_workers:
536            dut_worker.terminate()
537
538    def threads_status_as_string(self):
539      """Report the dut worker threads status."""
540
541      status = "{} active threads, {} finished threads.\n".format(
542        len(self._active_workers), len(self._finished_workers))
543      status += "  Active threads:"
544      for dw in self._active_workers:
545        status += '\n    ' + dw.status_str()
546      if self._finished_workers:
547        status += "\n  Finished threads:"
548        for dw in self._finished_workers:
549          status += '\n    ' + dw.status_str()
550      return status
551
552
553class MockExperimentRunner(ExperimentRunner):
554  """Mocked ExperimentRunner for testing."""
555
556  def __init__(self, experiment):
557    super(MockExperimentRunner, self).__init__(experiment)
558
559  def _Run(self, experiment):
560    self.l.LogOutput("Would run the following experiment: '%s'." %
561                     experiment.name)
562
563  def _PrintTable(self, experiment):
564    self.l.LogOutput("Would print the experiment table.")
565
566  def _Email(self, experiment):
567    self.l.LogOutput("Would send result email.")
568
569  def _StoreResults(self, experiment):
570    self.l.LogOutput("Would store the results.")
571