experiment_runner.py revision 68c1d611c1ab7d92199f12456b097cd19e18ac1c
1#!/usr/bin/python
2
3# Copyright 2011 Google Inc. All Rights Reserved.
4
5"""The experiment runner module."""
6import getpass
7import os
8import random
9import shutil
10import sys
11import time
12import traceback
13
14import afe_lock_machine
15from machine_image_manager import MachineImageManager
16
17from collections import defaultdict
18from utils import command_executer
19from utils import logger
20from utils.email_sender import EmailSender
21from utils.file_utils import FileUtils
22from threading import Lock
23from threading import Thread
24
25import config
26from experiment_status import ExperimentStatus
27from results_cache import CacheConditions
28from results_cache import ResultsCache
29from results_report import HTMLResultsReport
30from results_report import TextResultsReport
31from results_report import JSONResultsReport
32
33
34class ExperimentRunner(object):
35  """ExperimentRunner Class."""
36
37  STATUS_TIME_DELAY = 30
38  THREAD_MONITOR_DELAY = 2
39
40  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
41               cmd_exec=None):
42    self._experiment = experiment
43    self.l = log or logger.GetLogger(experiment.log_dir)
44    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
45    self._terminated = False
46    self.json_report = json_report
47    self.locked_machines = []
48    if experiment.log_level != "verbose":
49      self.STATUS_TIME_DELAY = 10
50
51    # Setting this to True will use crosperf sched v2 (feature in progress).
52    self._using_schedv2 = using_schedv2
53
54  def _GetMachineList(self):
55    """Return a list of all requested machines.
56
57    Create a list of all the requested machines, both global requests and
58    label-specific requests, and return the list.
59    """
60    machines = self._experiment.remote
61    for l in self._experiment.labels:
62      if l.remote:
63        machines += l.remote
64    return machines
65
66  def _UpdateMachineList(self, locked_machines):
67    """Update machines lists to contain only locked machines.
68
69    Go through all the lists of requested machines, both global and
70    label-specific requests, and remove any machine that we were not
71    able to lock.
72
73    Args:
74      locked_machines: A list of the machines we successfully locked.
75    """
76    for m in self._experiment.remote:
77      if m not in locked_machines:
78        self._experiment.remote.remove(m)
79
80    for l in self._experiment.labels:
81      for m in l.remote:
82        if m not in locked_machines:
83          l.remote.remove(m)
84
85  def _LockAllMachines(self, experiment):
86    """Attempt to globally lock all of the machines requested for run.
87
88    This method will use the AFE server to globally lock all of the machines
89    requested for this crosperf run, to prevent any other crosperf runs from
90    being able to update/use the machines while this experiment is running.
91    """
92    lock_mgr = afe_lock_machine.AFELockManager(
93        self._GetMachineList(),
94        "",
95        experiment.labels[0].chromeos_root,
96        None,
97        log=self.l,
98    )
99    for m in lock_mgr.machines:
100      if not lock_mgr.MachineIsKnown(m):
101        lock_mgr.AddLocalMachine(m)
102    machine_states = lock_mgr.GetMachineStates("lock")
103    lock_mgr.CheckMachineLocks(machine_states, "lock")
104    self.locked_machines = lock_mgr.UpdateMachines(True)
105    self._experiment.locked_machines = self.locked_machines
106    self._UpdateMachineList(self.locked_machines)
107    self._experiment.machine_manager.RemoveNonLockedMachines(
108        self.locked_machines)
109    if len(self.locked_machines) == 0:
110        raise RuntimeError("Unable to lock any machines.")
111
112  def _UnlockAllMachines(self, experiment):
113    """Attempt to globally unlock all of the machines requested for run.
114
115    The method will use the AFE server to globally unlock all of the machines
116    requested for this crosperf run.
117    """
118    if not self.locked_machines:
119        return
120
121    lock_mgr = afe_lock_machine.AFELockManager(
122        self.locked_machines,
123        "",
124        experiment.labels[0].chromeos_root,
125        None,
126        log=self.l,
127    )
128    machine_states = lock_mgr.GetMachineStates("unlock")
129    lock_mgr.CheckMachineLocks(machine_states, "unlock")
130    lock_mgr.UpdateMachines(False)
131
132  def _ClearCacheEntries(self, experiment):
133    for br in experiment.benchmark_runs:
134      cache = ResultsCache()
135      cache.Init (br.label.chromeos_image, br.label.chromeos_root,
136                  br.benchmark.test_name, br.iteration, br.test_args,
137                  br.profiler_args, br.machine_manager, br.label.board,
138                  br.cache_conditions, br._logger, br.log_level, br.label,
139                  br.share_cache, br.benchmark.suite,
140                  br.benchmark.show_all_results, br.benchmark.run_local)
141      cache_dir = cache._GetCacheDirForWrite()
142      self.l.LogOutput("Removing cache dir: %s" % cache_dir)
143      shutil.rmtree(cache_dir)
144
145  def _Run(self, experiment):
146    try:
147      if not experiment.locks_dir:
148        self._LockAllMachines(experiment)
149      if self._using_schedv2:
150        schedv2 = Schedv2(experiment)
151        experiment.set_schedv2(schedv2)
152      if CacheConditions.FALSE in experiment.cache_conditions:
153        self._ClearCacheEntries(experiment)
154      status = ExperimentStatus(experiment)
155      experiment.Run()
156      last_status_time = 0
157      last_status_string = ""
158      try:
159        if experiment.log_level != "verbose":
160          self.l.LogStartDots()
161        while not experiment.IsComplete():
162          if last_status_time + self.STATUS_TIME_DELAY < time.time():
163            last_status_time = time.time()
164            border = "=============================="
165            if experiment.log_level == "verbose":
166              self.l.LogOutput(border)
167              self.l.LogOutput(status.GetProgressString())
168              self.l.LogOutput(status.GetStatusString())
169              self.l.LogOutput(border)
170            else:
171              current_status_string = status.GetStatusString()
172              if (current_status_string != last_status_string):
173                self.l.LogEndDots()
174                self.l.LogOutput(border)
175                self.l.LogOutput(current_status_string)
176                self.l.LogOutput(border)
177                last_status_string = current_status_string
178              else:
179                self.l.LogAppendDot()
180          time.sleep(self.THREAD_MONITOR_DELAY)
181      except KeyboardInterrupt:
182        self._terminated = True
183        self.l.LogError("Ctrl-c pressed. Cleaning up...")
184        experiment.Terminate()
185    finally:
186      if not experiment.locks_dir:
187        self._UnlockAllMachines(experiment)
188
189  def _PrintTable(self, experiment):
190    self.l.LogOutput(TextResultsReport(experiment).GetReport())
191
192  def _Email(self, experiment):
193    # Only email by default if a new run was completed.
194    send_mail = False
195    for benchmark_run in experiment.benchmark_runs:
196      if not benchmark_run.cache_hit:
197        send_mail = True
198        break
199    if (not send_mail and not experiment.email_to
200        or config.GetConfig("no_email")):
201      return
202
203    label_names = []
204    for label in experiment.labels:
205      label_names.append(label.name)
206    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
207
208    text_report = TextResultsReport(experiment, True).GetReport()
209    text_report += ("\nResults are stored in %s.\n" %
210                    experiment.results_directory)
211    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
212    html_report = HTMLResultsReport(experiment).GetReport()
213    attachment = EmailSender.Attachment("report.html", html_report)
214    email_to = [getpass.getuser()] + experiment.email_to
215    EmailSender().SendEmail(email_to,
216                            subject,
217                            text_report,
218                            attachments=[attachment],
219                            msg_type="html")
220
221  def _StoreResults (self, experiment):
222    if self._terminated:
223      return
224    results_directory = experiment.results_directory
225    FileUtils().RmDir(results_directory)
226    FileUtils().MkDirP(results_directory)
227    self.l.LogOutput("Storing experiment file in %s." % results_directory)
228    experiment_file_path = os.path.join(results_directory,
229                                        "experiment.exp")
230    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
231
232    self.l.LogOutput("Storing results report in %s." % results_directory)
233    results_table_path = os.path.join(results_directory, "results.html")
234    report = HTMLResultsReport(experiment).GetReport()
235    if self.json_report:
236      JSONResultsReport(experiment).GetReport(results_directory)
237    FileUtils().WriteFile(results_table_path, report)
238
239    self.l.LogOutput("Storing email message body in %s." % results_directory)
240    msg_file_path = os.path.join(results_directory, "msg_body.html")
241    text_report = TextResultsReport(experiment, True).GetReport()
242    text_report += ("\nResults are stored in %s.\n" %
243                    experiment.results_directory)
244    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
245    FileUtils().WriteFile(msg_file_path, msg_body)
246
247    self.l.LogOutput("Storing results of each benchmark run.")
248    for benchmark_run in experiment.benchmark_runs:
249      if benchmark_run.result:
250        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
251        benchmark_run_path = os.path.join(results_directory,
252                                          benchmark_run_name)
253        benchmark_run.result.CopyResultsTo(benchmark_run_path)
254        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
255
256  def Run(self):
257    self._Run(self._experiment)
258    self._PrintTable(self._experiment)
259    if not self._terminated:
260      self._StoreResults(self._experiment)
261      self._Email(self._experiment)
262
263class DutWorker(Thread):
264
265    def __init__(self, dut, sched):
266        super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
267        self._dut = dut
268        self._sched = sched
269        self._stat_num_br_run = 0
270        self._stat_num_reimage = 0
271        self._stat_annotation = ""
272        self._l = logger.GetLogger(self._sched._experiment.log_dir)
273        self.daemon = True
274        self._terminated = False
275        self._active_br = None
276        # Race condition accessing _active_br between _execute_benchmark_run and
277        # _terminate, so lock it up.
278        self._active_br_lock = Lock()
279
280    def terminate(self):
281        self._terminated = True
282        with self._active_br_lock:
283            if self._active_br is not None:
284                # BenchmarkRun.Terminate() terminates any running testcase via
285                # suite_runner.Terminate and updates timeline.
286                self._active_br.Terminate()
287
288    def run(self):
289        """Do the "run-test->(optionally reimage)->run-test" chore.
290
291        Note - 'br' below means 'benchmark_run'.
292        """
293
294        self._setup_dut_label()
295        try:
296            self._l.LogOutput("{} started.".format(self))
297            while not self._terminated:
298                br = self._sched.get_benchmark_run(self._dut)
299                if br is None:
300                    # No br left for this label. Considering reimaging.
301                    label = self._sched.allocate_label(self._dut)
302                    if label is None:
303                        # No br even for other labels. We are done.
304                        self._l.LogOutput("ImageManager found no label "
305                                          "for dut, stopping working "
306                                          "thread {}.".format(self))
307                        break
308                    if self._reimage(label):
309                        # Reimage to run other br fails, dut is doomed, stop
310                        # this thread.
311                        self._l.LogWarning("Re-image failed, dut "
312                                           "in an unstable state, stopping "
313                                           "working thread {}.".format(self))
314                        break
315                else:
316                    # Execute the br.
317                    self._execute_benchmark_run(br)
318        finally:
319            self._stat_annotation = "finished"
320            # Thread finishes. Notify scheduler that I'm done.
321            self._sched.dut_worker_finished(self)
322
323    def _reimage(self, label):
324        """Reimage image to label.
325
326        Args:
327          label: the label to remimage onto dut.
328
329        Returns:
330          0 if successful, otherwise 1.
331        """
332
333        # Termination could happen anywhere, check it.
334        if self._terminated:
335            return 1
336
337        self._l.LogOutput('Reimaging {} using {}'.format(self, label))
338        self._stat_num_reimage += 1
339        self._stat_annotation = 'reimaging using "{}"'.format(label.name)
340        try:
341            # Note, only 1 reimage at any given time, this is guaranteed in
342            # ImageMachine, so no sync needed below.
343            retval = self._sched._experiment.machine_manager.ImageMachine(
344                self._dut, label)
345            if retval:
346                return 1
347        except:
348            return 1
349
350        self._dut.label = label
351        return 0
352
353    def _execute_benchmark_run(self, br):
354        """Execute a single benchmark_run.
355
356        Note - this function never throws exceptions.
357        """
358
359        # Termination could happen anywhere, check it.
360        if self._terminated:
361            return
362
363        self._l.LogOutput('{} started working on {}'.format(self, br))
364        self._stat_num_br_run += 1
365        self._stat_annotation = 'executing {}'.format(br)
366        # benchmark_run.run does not throws, but just play it safe here.
367        try:
368            assert br.owner_thread is None
369            br.owner_thread = self
370            with self._active_br_lock:
371                self._active_br = br
372            br.run()
373        finally:
374            self._sched._experiment.BenchmarkRunFinished(br)
375            with self._active_br_lock:
376                self._active_br = None
377
378    def _setup_dut_label(self):
379        """Try to match dut image with a certain experiment label.
380
381        If such match is found, we just skip doing reimage and jump to execute
382        some benchmark_runs.
383        """
384
385        checksum_file = "/usr/local/osimage_checksum_file"
386        try:
387            rv, checksum, _ = command_executer.GetCommandExecuter().\
388                CrosRunCommand(
389                    "cat " + checksum_file,
390                    return_output=True,
391                    chromeos_root=self._sched._labels[0].chromeos_root,
392                    machine=self._dut.name)
393            if rv == 0:
394                checksum = checksum.strip()
395                for l in self._sched._labels:
396                    if l.checksum == checksum:
397                        self._l.LogOutput(
398                            "Dut '{}' is pre-installed with '{}'".format(
399                                self._dut.name, l))
400                        self._dut.label = l
401                        return
402        except:
403            traceback.print_exc(file=sys.stdout)
404            self._dut.label = None
405
406    def __str__(self):
407        return 'DutWorker[dut="{}", label="{}"]'.format(
408            self._dut.name, self._dut.label.name if self._dut.label else "None")
409
410    def dut(self):
411        return self._dut
412
413    def status_str(self):
414      """Report thread status."""
415
416      return ('Worker thread "{}", label="{}", benchmark_run={}, '
417              'reimage={}, now {}'.format(
418                self._dut.name,
419                'None' if self._dut.label is None else self._dut.label.name,
420                self._stat_num_br_run,
421                self._stat_num_reimage,
422                self._stat_annotation))
423
424
425class Schedv2(object):
426    """New scheduler for crosperf."""
427
428    def __init__(self, experiment):
429        self._experiment = experiment
430        self._l = logger.GetLogger(experiment.log_dir)
431
432        # Create shortcuts to nested data structure. "_duts" points to a list of
433        # locked machines. _labels points to a list of all labels.
434        self._duts = self._experiment.machine_manager._all_machines
435        self._labels = self._experiment.labels
436
437        # Mapping from label to a list of benchmark_runs.
438        self._label_brl_map = dict([(l, []) for l in self._labels])
439        for br in self._experiment.benchmark_runs:
440            assert br.label in self._label_brl_map
441            self._label_brl_map[br.label].append(br)
442
443        # Use machine image manager to calculate initial label allocation.
444        self._mim = MachineImageManager(self._labels, self._duts)
445        self._mim.compute_initial_allocation()
446
447        # Create worker thread, 1 per dut.
448        self._active_workers = [DutWorker(dut, self) for dut in self._duts]
449        self._finished_workers = []
450
451        # Bookkeeping for synchronization.
452        self._workers_lock = Lock()
453        self._lock_map = defaultdict(lambda: Lock())
454
455        # Termination flag.
456        self._terminated = False
457
458    def _check_machines_are_same(self):
459        """Check if all machines are the same."""
460
461        for l in self._labels:
462            if not self._experiment.machine_manager.AreAllMachineSame(l):
463                self._l.LogError('All machines are NOT same for '
464                                'label "{}" ...'.format(l.name))
465                return False
466            self._l.LogOutput('Machines are same for '
467                              'label "{}" ...'.format(l.name))
468        return True
469
470    def run_sched(self):
471        """Start all dut worker threads and return immediately."""
472
473        if not self._check_machines_are_same():
474            raise RuntimeError("All machines are not same.")
475
476        [w.start() for w in self._active_workers]
477
478    def get_benchmark_run(self, dut):
479        """Get a benchmark_run (br) object for a certain dut.
480
481        Arguments:
482          dut: the dut for which a br is returned.
483
484        Returns:
485          A br with its label matching that of the dut. If no such br could be
486          found, return None (this usually means a reimage is required for the
487          dut).
488        """
489
490        # If terminated, stop providing any br.
491        if self._terminated:
492            return None
493
494        # If dut bears an unrecognized label, return None.
495        if dut.label is None:
496            return None
497
498        # If br list for the dut's label is empty (that means all brs for this
499        # label have been done) , return None.
500        with self._lock_on(dut.label):
501            brl = self._label_brl_map[dut.label]
502            if not brl:
503                return None
504            # Return the first br.
505            return brl.pop(0)
506
507    def allocate_label(self, dut):
508        """Allocate a label to a dut.
509
510        The work is delegated to MachineImageManager.
511
512        The dut_worker calling this method is responsible for reimage the dut to
513        this label.
514
515        Arguments:
516          dut: the new label that is to be reimaged onto the dut.
517
518        Returns:
519          The label or None.
520        """
521
522        if self._terminated:
523            return None
524
525        return self._mim.allocate(dut)
526
527    def dut_worker_finished(self, dut_worker):
528        """Notify schedv2 that the dut_worker thread finished.
529
530        Arguemnts:
531          dut_worker: the thread that is about to end."""
532
533        self._l.LogOutput("{} finished.".format(dut_worker))
534        with self._workers_lock:
535            self._active_workers.remove(dut_worker)
536            self._finished_workers.append(dut_worker)
537
538    def is_complete(self):
539      return len(self._active_workers) == 0
540
541    def _lock_on(self, object):
542        return self._lock_map[object]
543
544    def terminate(self):
545        """Mark flag so we stop providing br/reimages.
546
547        Also terminate each DutWorker, so they refuse to execute br or reimage.
548        """
549
550        self._terminated = True
551        for dut_worker in self._active_workers:
552            dut_worker.terminate()
553
554    def threads_status_as_string(self):
555      """Report the dut worker threads status."""
556
557      status = "{} active threads, {} finished threads.\n".format(
558        len(self._active_workers), len(self._finished_workers))
559      status += "  Active threads:"
560      for dw in self._active_workers:
561        status += '\n    ' + dw.status_str()
562      if self._finished_workers:
563        status += "\n  Finished threads:"
564        for dw in self._finished_workers:
565          status += '\n    ' + dw.status_str()
566      return status
567
568
569class MockExperimentRunner(ExperimentRunner):
570  """Mocked ExperimentRunner for testing."""
571
572  def __init__(self, experiment):
573    super(MockExperimentRunner, self).__init__(experiment)
574
575  def _Run(self, experiment):
576    self.l.LogOutput("Would run the following experiment: '%s'." %
577                     experiment.name)
578
579  def _PrintTable(self, experiment):
580    self.l.LogOutput("Would print the experiment table.")
581
582  def _Email(self, experiment):
583    self.l.LogOutput("Would send result email.")
584
585  def _StoreResults(self, experiment):
586    self.l.LogOutput("Would store the results.")
587