experiment_runner.py revision 245e0635a5ae426c5345a06baf807940d1784125
1#!/usr/bin/python
2
3# Copyright 2011 Google Inc. All Rights Reserved.
4
5"""The experiment runner module."""
6import getpass
7import os
8import random
9import sys
10import time
11import traceback
12
13import afe_lock_machine
14from machine_image_manager import MachineImageManager
15
16from collections import defaultdict
17from utils import command_executer
18from utils import logger
19from utils.email_sender import EmailSender
20from utils.file_utils import FileUtils
21from threading import Lock
22from threading import Thread
23
24import config
25from experiment_status import ExperimentStatus
26from results_report import HTMLResultsReport
27from results_report import TextResultsReport
28from results_report import JSONResultsReport
29
30
31class ExperimentRunner(object):
32  """ExperimentRunner Class."""
33
34  STATUS_TIME_DELAY = 30
35  THREAD_MONITOR_DELAY = 2
36
37  def __init__(self, experiment, json_report, using_schedv2=False, log=None,
38               cmd_exec=None):
39    self._experiment = experiment
40    self.l = log or logger.GetLogger(experiment.log_dir)
41    self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l)
42    self._terminated = False
43    self.json_report = json_report
44    self.locked_machines = []
45    if experiment.log_level != "verbose":
46      self.STATUS_TIME_DELAY = 10
47
48    # Setting this to True will use crosperf sched v2 (feature in progress).
49    self._using_schedv2 = using_schedv2
50
51  def _GetMachineList(self):
52    """Return a list of all requested machines.
53
54    Create a list of all the requested machines, both global requests and
55    label-specific requests, and return the list.
56    """
57    machines = self._experiment.remote
58    for l in self._experiment.labels:
59      if l.remote:
60        machines += l.remote
61    return machines
62
63  def _UpdateMachineList(self, locked_machines):
64    """Update machines lists to contain only locked machines.
65
66    Go through all the lists of requested machines, both global and
67    label-specific requests, and remove any machine that we were not
68    able to lock.
69
70    Args:
71      locked_machines: A list of the machines we successfully locked.
72    """
73    for m in self._experiment.remote:
74      if m not in locked_machines:
75        self._experiment.remote.remove(m)
76
77    for l in self._experiment.labels:
78      for m in l.remote:
79        if m not in locked_machines:
80          l.remote.remove(m)
81
82  def _LockAllMachines(self, experiment):
83    """Attempt to globally lock all of the machines requested for run.
84
85    This method will use the AFE server to globally lock all of the machines
86    requested for this crosperf run, to prevent any other crosperf runs from
87    being able to update/use the machines while this experiment is running.
88    """
89    lock_mgr = afe_lock_machine.AFELockManager(
90        self._GetMachineList(),
91        "",
92        experiment.labels[0].chromeos_root,
93        None,
94        log=self.l,
95    )
96    for m in lock_mgr.machines:
97      if not lock_mgr.MachineIsKnown(m):
98        lock_mgr.AddLocalMachine(m)
99    machine_states = lock_mgr.GetMachineStates("lock")
100    lock_mgr.CheckMachineLocks(machine_states, "lock")
101    self.locked_machines = lock_mgr.UpdateMachines(True)
102    self._experiment.locked_machines = self.locked_machines
103    self._UpdateMachineList(self.locked_machines)
104    self._experiment.machine_manager.RemoveNonLockedMachines(
105        self.locked_machines)
106    if len(self.locked_machines) == 0:
107        raise RuntimeError("Unable to lock any machines.")
108
109  def _UnlockAllMachines(self, experiment):
110    """Attempt to globally unlock all of the machines requested for run.
111
112    The method will use the AFE server to globally unlock all of the machines
113    requested for this crosperf run.
114    """
115    if not self.locked_machines:
116        return
117
118    lock_mgr = afe_lock_machine.AFELockManager(
119        self.locked_machines,
120        "",
121        experiment.labels[0].chromeos_root,
122        None,
123        log=self.l,
124    )
125    machine_states = lock_mgr.GetMachineStates("unlock")
126    lock_mgr.CheckMachineLocks(machine_states, "unlock")
127    lock_mgr.UpdateMachines(False)
128
129  def _Run(self, experiment):
130    try:
131      if not experiment.locks_dir:
132        self._LockAllMachines(experiment)
133      if self._using_schedv2:
134        schedv2 = Schedv2(experiment)
135        experiment.set_schedv2(schedv2)
136      status = ExperimentStatus(experiment)
137      experiment.Run()
138      last_status_time = 0
139      last_status_string = ""
140      try:
141        if experiment.log_level != "verbose":
142          self.l.LogStartDots()
143        while not experiment.IsComplete():
144          if last_status_time + self.STATUS_TIME_DELAY < time.time():
145            last_status_time = time.time()
146            border = "=============================="
147            if experiment.log_level == "verbose":
148              self.l.LogOutput(border)
149              self.l.LogOutput(status.GetProgressString())
150              self.l.LogOutput(status.GetStatusString())
151              self.l.LogOutput(border)
152            else:
153              current_status_string = status.GetStatusString()
154              if (current_status_string != last_status_string):
155                self.l.LogEndDots()
156                self.l.LogOutput(border)
157                self.l.LogOutput(current_status_string)
158                self.l.LogOutput(border)
159                last_status_string = current_status_string
160              else:
161                self.l.LogAppendDot()
162          time.sleep(self.THREAD_MONITOR_DELAY)
163      except KeyboardInterrupt:
164        self._terminated = True
165        self.l.LogError("Ctrl-c pressed. Cleaning up...")
166        experiment.Terminate()
167    finally:
168      if not experiment.locks_dir:
169        self._UnlockAllMachines(experiment)
170
171  def _PrintTable(self, experiment):
172    self.l.LogOutput(TextResultsReport(experiment).GetReport())
173
174  def _Email(self, experiment):
175    # Only email by default if a new run was completed.
176    send_mail = False
177    for benchmark_run in experiment.benchmark_runs:
178      if not benchmark_run.cache_hit:
179        send_mail = True
180        break
181    if (not send_mail and not experiment.email_to
182        or config.GetConfig("no_email")):
183      return
184
185    label_names = []
186    for label in experiment.labels:
187      label_names.append(label.name)
188    subject = "%s: %s" % (experiment.name, " vs. ".join(label_names))
189
190    text_report = TextResultsReport(experiment, True).GetReport()
191    text_report += ("\nResults are stored in %s.\n" %
192                    experiment.results_directory)
193    text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report
194    html_report = HTMLResultsReport(experiment).GetReport()
195    attachment = EmailSender.Attachment("report.html", html_report)
196    email_to = [getpass.getuser()] + experiment.email_to
197    EmailSender().SendEmail(email_to,
198                            subject,
199                            text_report,
200                            attachments=[attachment],
201                            msg_type="html")
202
203  def _StoreResults (self, experiment):
204    if self._terminated:
205      return
206    results_directory = experiment.results_directory
207    FileUtils().RmDir(results_directory)
208    FileUtils().MkDirP(results_directory)
209    self.l.LogOutput("Storing experiment file in %s." % results_directory)
210    experiment_file_path = os.path.join(results_directory,
211                                        "experiment.exp")
212    FileUtils().WriteFile(experiment_file_path, experiment.experiment_file)
213
214    self.l.LogOutput("Storing results report in %s." % results_directory)
215    results_table_path = os.path.join(results_directory, "results.html")
216    report = HTMLResultsReport(experiment).GetReport()
217    if self.json_report:
218      JSONResultsReport(experiment).GetReport(results_directory)
219    FileUtils().WriteFile(results_table_path, report)
220
221    self.l.LogOutput("Storing email message body in %s." % results_directory)
222    msg_file_path = os.path.join(results_directory, "msg_body.html")
223    text_report = TextResultsReport(experiment, True).GetReport()
224    text_report += ("\nResults are stored in %s.\n" %
225                    experiment.results_directory)
226    msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report
227    FileUtils().WriteFile(msg_file_path, msg_body)
228
229    self.l.LogOutput("Storing results of each benchmark run.")
230    for benchmark_run in experiment.benchmark_runs:
231      if benchmark_run.result:
232        benchmark_run_name = filter(str.isalnum, benchmark_run.name)
233        benchmark_run_path = os.path.join(results_directory,
234                                          benchmark_run_name)
235        benchmark_run.result.CopyResultsTo(benchmark_run_path)
236        benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp)
237
238  def Run(self):
239    self._Run(self._experiment)
240    self._PrintTable(self._experiment)
241    if not self._terminated:
242      self._StoreResults(self._experiment)
243      self._Email(self._experiment)
244
245class DutWorker(Thread):
246
247    def __init__(self, dut, sched):
248        super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
249        self._dut = dut
250        self._sched = sched
251        self._stat_num_br_run = 0
252        self._stat_num_reimage = 0
253        self._stat_annotation = ""
254        self._l = logger.GetLogger(self._sched._experiment.log_dir)
255        self.daemon = True
256        self._terminated = False
257        self._active_br = None
258        # Race condition accessing _active_br between _execute_benchmark_run and
259        # _terminate, so lock it up.
260        self._active_br_lock = Lock()
261
262    def terminate(self):
263        self._terminated = True
264        with self._active_br_lock:
265            if self._active_br is not None:
266                # BenchmarkRun.Terminate() terminates any running testcase via
267                # suite_runner.Terminate and updates timeline.
268                self._active_br.Terminate()
269
270    def run(self):
271        """Do the "run-test->(optionally reimage)->run-test" chore.
272
273        Note - 'br' below means 'benchmark_run'.
274        """
275
276        self._setup_dut_label()
277        try:
278            self._l.LogOutput("{} started.".format(self))
279            while not self._terminated:
280                br = self._sched.get_benchmark_run(self._dut)
281                if br is None:
282                    # No br left for this label. Considering reimaging.
283                    label = self._sched.allocate_label(self._dut)
284                    if label is None:
285                        # No br even for other labels. We are done.
286                        self._l.LogOutput("ImageManager found no label "
287                                          "for dut, stopping working "
288                                          "thread {}.".format(self))
289                        break
290                    if self._reimage(label):
291                        # Reimage to run other br fails, dut is doomed, stop
292                        # this thread.
293                        self._l.LogWarning("Re-image failed, dut "
294                                           "in an unstable state, stopping "
295                                           "working thread {}.".format(self))
296                        break
297                else:
298                    # Execute the br.
299                    self._execute_benchmark_run(br)
300        finally:
301            self._stat_annotation = "finished"
302            # Thread finishes. Notify scheduler that I'm done.
303            self._sched.dut_worker_finished(self)
304
305    def _reimage(self, label):
306        """Reimage image to label.
307
308        Args:
309          label: the label to remimage onto dut.
310
311        Returns:
312          0 if successful, otherwise 1.
313        """
314
315        # Termination could happen anywhere, check it.
316        if self._terminated:
317            return 1
318
319        self._l.LogOutput('Reimaging {} using {}'.format(self, label))
320        self._stat_num_reimage += 1
321        self._stat_annotation = 'reimaging using "{}"'.format(label.name)
322        try:
323            # Note, only 1 reimage at any given time, this is guaranteed in
324            # ImageMachine, so no sync needed below.
325            retval = self._sched._experiment.machine_manager.ImageMachine(
326                self._dut, label)
327            if retval:
328                return 1
329        except:
330            return 1
331
332        self._dut.label = label
333        return 0
334
335    def _execute_benchmark_run(self, br):
336        """Execute a single benchmark_run.
337
338        Note - this function never throws exceptions.
339        """
340
341        # Termination could happen anywhere, check it.
342        if self._terminated:
343            return
344
345        self._l.LogOutput('{} started working on {}'.format(self, br))
346        self._stat_num_br_run += 1
347        self._stat_annotation = 'executing {}'.format(br)
348        # benchmark_run.run does not throws, but just play it safe here.
349        try:
350            assert br.owner_thread is None
351            br.owner_thread = self
352            with self._active_br_lock:
353                self._active_br = br
354            br.run()
355        finally:
356            self._sched._experiment.BenchmarkRunFinished(br)
357            with self._active_br_lock:
358                self._active_br = None
359
360    def _setup_dut_label(self):
361        """Try to match dut image with a certain experiment label.
362
363        If such match is found, we just skip doing reimage and jump to execute
364        some benchmark_runs.
365        """
366
367        checksum_file = "/usr/local/osimage_checksum_file"
368        try:
369            rv, checksum, _ = command_executer.GetCommandExecuter().\
370                CrosRunCommand(
371                    "cat " + checksum_file,
372                    return_output=True,
373                    chromeos_root=self._sched._labels[0].chromeos_root,
374                    machine=self._dut.name)
375            if rv == 0:
376                checksum = checksum.strip()
377                for l in self._sched._labels:
378                    if l.checksum == checksum:
379                        self._l.LogOutput(
380                            "Dut '{}' is pre-installed with '{}'".format(
381                                self._dut.name, l))
382                        self._dut.label = l
383                        return
384        except:
385            traceback.print_exc(file=sys.stdout)
386            self._dut.label = None
387
388    def __str__(self):
389        return 'DutWorker[dut="{}", label="{}"]'.format(
390            self._dut.name, self._dut.label.name if self._dut.label else "None")
391
392    def dut(self):
393        return self._dut
394
395    def status_str(self):
396      """Report thread status."""
397
398      return ('Worker thread "{}", label="{}", benchmark_run={}, '
399              'reimage={}, now {}'.format(
400                self._dut.name,
401                'None' if self._dut.label is None else self._dut.label.name,
402                self._stat_num_br_run,
403                self._stat_num_reimage,
404                self._stat_annotation))
405
406
407class Schedv2(object):
408    """New scheduler for crosperf."""
409
410    def __init__(self, experiment):
411        self._experiment = experiment
412        self._l = logger.GetLogger(experiment.log_dir)
413
414        # Create shortcuts to nested data structure. "_duts" points to a list of
415        # locked machines. _labels points to a list of all labels.
416        self._duts = self._experiment.machine_manager._all_machines
417        self._labels = self._experiment.labels
418
419        # Mapping from label to a list of benchmark_runs.
420        self._label_brl_map = dict([(l, []) for l in self._labels])
421        for br in self._experiment.benchmark_runs:
422            assert br.label in self._label_brl_map
423            self._label_brl_map[br.label].append(br)
424
425        # Use machine image manager to calculate initial label allocation.
426        self._mim = MachineImageManager(self._labels, self._duts)
427        self._mim.compute_initial_allocation()
428
429        # Create worker thread, 1 per dut.
430        self._active_workers = [DutWorker(dut, self) for dut in self._duts]
431        self._finished_workers = []
432
433        # Bookkeeping for synchronization.
434        self._workers_lock = Lock()
435        self._lock_map = defaultdict(lambda: Lock())
436
437        # Termination flag.
438        self._terminated = False
439
440    def _check_machines_are_same(self):
441        """Check if all machines are the same."""
442
443        for l in self._labels:
444            if not self._experiment.machine_manager.AreAllMachineSame(l):
445                self._l.LogError('All machines are NOT same for '
446                                'label "{}" ...'.format(l.name))
447                return False
448            self._l.LogOutput('Machines are same for '
449                              'label "{}" ...'.format(l.name))
450        return True
451
452    def run_sched(self):
453        """Start all dut worker threads and return immediately."""
454
455        if not self._check_machines_are_same():
456            raise RuntimeError("All machines are not same.")
457
458        [w.start() for w in self._active_workers]
459
460    def get_benchmark_run(self, dut):
461        """Get a benchmark_run (br) object for a certain dut.
462
463        Arguments:
464          dut: the dut for which a br is returned.
465
466        Returns:
467          A br with its label matching that of the dut. If no such br could be
468          found, return None (this usually means a reimage is required for the
469          dut).
470        """
471
472        # If terminated, stop providing any br.
473        if self._terminated:
474            return None
475
476        # If dut bears an unrecognized label, return None.
477        if dut.label is None:
478            return None
479
480        # If br list for the dut's label is empty (that means all brs for this
481        # label have been done) , return None.
482        with self._lock_on(dut.label):
483            brl = self._label_brl_map[dut.label]
484            if not brl:
485                return None
486            # Return the first br.
487            return brl.pop(0)
488
489    def allocate_label(self, dut):
490        """Allocate a label to a dut.
491
492        The work is delegated to MachineImageManager.
493
494        The dut_worker calling this method is responsible for reimage the dut to
495        this label.
496
497        Arguments:
498          dut: the new label that is to be reimaged onto the dut.
499
500        Returns:
501          The label or None.
502        """
503
504        if self._terminated:
505            return None
506
507        return self._mim.allocate(dut)
508
509    def dut_worker_finished(self, dut_worker):
510        """Notify schedv2 that the dut_worker thread finished.
511
512        Arguemnts:
513          dut_worker: the thread that is about to end."""
514
515        self._l.LogOutput("{} finished.".format(dut_worker))
516        with self._workers_lock:
517            self._active_workers.remove(dut_worker)
518            self._finished_workers.append(dut_worker)
519
520    def is_complete(self):
521      return len(self._active_workers) == 0
522
523    def _lock_on(self, object):
524        return self._lock_map[object]
525
526    def terminate(self):
527        """Mark flag so we stop providing br/reimages.
528
529        Also terminate each DutWorker, so they refuse to execute br or reimage.
530        """
531
532        self._terminated = True
533        for dut_worker in self._active_workers:
534            dut_worker.terminate()
535
536    def threads_status_as_string(self):
537      """Report the dut worker threads status."""
538
539      status = "{} active threads, {} finished threads.\n".format(
540        len(self._active_workers), len(self._finished_workers))
541      status += "  Active threads:"
542      for dw in self._active_workers:
543        status += '\n    ' + dw.status_str()
544      if self._finished_workers:
545        status += "\n  Finished threads:"
546        for dw in self._finished_workers:
547          status += '\n    ' + dw.status_str()
548      return status
549
550
551class MockExperimentRunner(ExperimentRunner):
552  """Mocked ExperimentRunner for testing."""
553
554  def __init__(self, experiment):
555    super(MockExperimentRunner, self).__init__(experiment)
556
557  def _Run(self, experiment):
558    self.l.LogOutput("Would run the following experiment: '%s'." %
559                     experiment.name)
560
561  def _PrintTable(self, experiment):
562    self.l.LogOutput("Would print the experiment table.")
563
564  def _Email(self, experiment):
565    self.l.LogOutput("Would send result email.")
566
567  def _StoreResults(self, experiment):
568    self.l.LogOutput("Would store the results.")
569