benchmark_run.py revision 5ea9f006b5de0d882d5b51da243806b7cac69938
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import datetime
8import os
9import threading
10import time
11import traceback
12
13from utils import command_executer
14from utils import timeline
15
16from machine_manager import NonMatchingMachines
17from suite_runner import SuiteRunner
18from results_cache import MockResult
19from results_cache import MockResultsCache
20from results_cache import Result
21from results_cache import ResultsCache
22from results_cache import TelemetryResult
23
24
25STATUS_FAILED = "FAILED"
26STATUS_SUCCEEDED = "SUCCEEDED"
27STATUS_IMAGING = "IMAGING"
28STATUS_RUNNING = "RUNNING"
29STATUS_WAITING = "WAITING"
30STATUS_PENDING = "PENDING"
31
32class BenchmarkRun(threading.Thread):
33  def __init__(self, name, benchmark,
34               label,
35               iteration,
36               cache_conditions,
37               machine_manager,
38               logger_to_use,
39               log_level,
40               share_cache):
41    threading.Thread.__init__(self)
42    self.name = name
43    self._logger = logger_to_use
44    self.log_level = log_level
45    self.benchmark = benchmark
46    self.iteration = iteration
47    self.label = label
48    self.result = None
49    self.terminated = False
50    self.retval = None
51    self.run_completed = False
52    self.machine_manager = machine_manager
53    self.suite_runner = SuiteRunner(self._logger, self.log_level)
54    self.machine = None
55    self.cache_conditions = cache_conditions
56    self.runs_complete = 0
57    self.cache_hit = False
58    self.failure_reason = ""
59    self.test_args = benchmark.test_args
60    self.profiler_args = self._GetExtraAutotestArgs()
61    self._ce = command_executer.GetCommandExecuter(self._logger,
62                                                   log_level=self.log_level)
63    self.timeline = timeline.Timeline()
64    self.timeline.Record(STATUS_PENDING)
65    self.share_cache = share_cache
66
67    # This is used by schedv2.
68    self.owner_thread = None
69
70  def ReadCache(self):
71    # Just use the first machine for running the cached version,
72    # without locking it.
73    self.cache = ResultsCache()
74    self.cache.Init(self.label.chromeos_image,
75                    self.label.chromeos_root,
76                    self.benchmark.test_name,
77                    self.iteration,
78                    self.test_args,
79                    self.profiler_args,
80                    self.machine_manager,
81                    self.machine,
82                    self.label.board,
83                    self.cache_conditions,
84                    self._logger,
85                    self.log_level,
86                    self.label,
87                    self.share_cache,
88                    self.benchmark.suite,
89                    self.benchmark.show_all_results,
90                    self.benchmark.run_local
91                   )
92
93    self.result = self.cache.ReadResult()
94    self.cache_hit = (self.result is not None)
95
96  def run(self):
97    try:
98      self.ReadCache()
99
100      if self.result:
101        self._logger.LogOutput("%s: Cache hit." % self.name)
102        self._logger.LogOutput(self.result.out, print_to_console=False)
103        self._logger.LogError(self.result.err, print_to_console=False)
104
105      elif self.label.cache_only:
106        self._logger.LogOutput("%s: No cache hit." % self.name)
107        output = "%s: No Cache hit." % self.name
108        retval = 1
109        err = "No cache hit."
110        self.result = Result.CreateFromRun(self._logger, self.log_level,
111                                           self.label,
112                                           output, err, retval,
113                                           self.benchmark.show_all_results,
114                                           self.benchmark.test_name,
115                                           self.benchmark.suite)
116
117      else:
118        self._logger.LogOutput("%s: No cache hit." % self.name)
119        self.timeline.Record(STATUS_WAITING)
120        # Try to acquire a machine now.
121        self.machine = self.AcquireMachine()
122        self.cache.machine = self.machine
123        self.result = self.RunTest(self.machine)
124
125        self.cache.remote = self.machine.name
126        self.cache.StoreResult(self.result)
127
128      if self.terminated:
129        return
130
131      if not self.result.retval:
132        self.timeline.Record(STATUS_SUCCEEDED)
133      else:
134        if self.timeline.GetLastEvent() != STATUS_FAILED:
135          self.failure_reason = "Return value of test suite was non-zero."
136          self.timeline.Record(STATUS_FAILED)
137
138    except Exception, e:
139      self._logger.LogError("Benchmark run: '%s' failed: %s" % (self.name, e))
140      traceback.print_exc()
141      if self.timeline.GetLastEvent() != STATUS_FAILED:
142        self.timeline.Record(STATUS_FAILED)
143        self.failure_reason = str(e)
144    finally:
145      if self.owner_thread is not None:
146        # In schedv2 mode, we do not lock machine locally. So noop here.
147        pass
148      elif self.machine:
149        if not self.machine.IsReachable():
150          self._logger.LogOutput("Machine %s is not reachable, removing it."
151                                 % self.machine.name)
152          self.machine_manager.RemoveMachine(self.machine.name)
153        self._logger.LogOutput("Releasing machine: %s" % self.machine.name)
154        self.machine_manager.ReleaseMachine(self.machine)
155        self._logger.LogOutput("Released machine: %s" % self.machine.name)
156
157  def Terminate(self):
158    self.terminated = True
159    self.suite_runner.Terminate()
160    if self.timeline.GetLastEvent() != STATUS_FAILED:
161      self.timeline.Record(STATUS_FAILED)
162      self.failure_reason = "Thread terminated."
163
164  def AcquireMachine(self):
165    if self.owner_thread is not None:
166      # No need to lock machine locally, DutWorker, which is a thread, is
167      # responsible for running br.
168      return self.owner_thread.dut()
169    while True:
170      machine = None
171      if self.terminated:
172        raise Exception("Thread terminated while trying to acquire machine.")
173      try:
174        machine = self.machine_manager.AcquireMachine(self.label.chromeos_image,
175                                                      self.label,
176                                                      throw=True)
177
178      except NonMatchingMachines:
179        self.machine_manager.ForceSameImageToAllMachines(self.label)
180        machine = self.machine_manager.AcquireMachine(self.label.chromeos_image,
181                                                      self.label,
182                                                      throw=False)
183
184      if machine:
185        self._logger.LogOutput("%s: Machine %s acquired at %s" %
186                               (self.name,
187                                machine.name,
188                                datetime.datetime.now()))
189        break
190      else:
191        sleep_duration = 10
192        time.sleep(sleep_duration)
193    return machine
194
195  def _GetExtraAutotestArgs(self):
196    if self.benchmark.perf_args and self.benchmark.suite == "telemetry":
197      self._logger.LogError("Telemetry does not support profiler.")
198      self.benchmark.perf_args = ""
199
200    if self.benchmark.perf_args and self.benchmark.suite == "test_that":
201      self._logger.LogError("test_that does not support profiler.")
202      self.benchmark.perf_args = ""
203
204    if self.benchmark.perf_args:
205      perf_args_list = self.benchmark.perf_args.split(" ")
206      perf_args_list = [perf_args_list[0]] + ["-a"] + perf_args_list[1:]
207      perf_args = " ".join(perf_args_list)
208      if not perf_args_list[0] in ["record", "stat"]:
209        raise Exception("perf_args must start with either record or stat")
210      extra_test_args = ["--profiler=custom_perf",
211                             ("--profiler_args='perf_options=\"%s\"'" %
212                              perf_args)]
213      return " ".join(extra_test_args)
214    else:
215      return ""
216
217  def RunTest(self, machine):
218    self.timeline.Record(STATUS_IMAGING)
219    if self.owner_thread is not None:
220      # In schedv2 mode, do not even call ImageMachine. Machine image is
221      # guarenteed.
222      pass
223    else:
224      self.machine_manager.ImageMachine(machine,
225                                        self.label)
226    self.timeline.Record(STATUS_RUNNING)
227    [retval, out, err] = self.suite_runner.Run(machine.name,
228                                                  self.label,
229                                                  self.benchmark,
230                                                  self.test_args,
231                                                  self.profiler_args)
232    self.run_completed = True
233    return Result.CreateFromRun(self._logger,
234                                self.log_level,
235                                self.label,
236                                out,
237                                err,
238                                retval,
239                                self.benchmark.show_all_results,
240                                self.benchmark.test_name,
241                                self.benchmark.suite)
242
243  def SetCacheConditions(self, cache_conditions):
244    self.cache_conditions = cache_conditions
245
246  def __str__(self):
247    """For better debugging."""
248
249    return 'BenchmarkRun[name="{}"]'.format(self.name)
250
251
252class MockBenchmarkRun(BenchmarkRun):
253  """Inherited from BenchmarkRuna."""
254
255  def ReadCache(self):
256    # Just use the first machine for running the cached version,
257    # without locking it.
258    self.cache = MockResultsCache()
259    self.cache.Init(self.label.chromeos_image,
260                    self.label.chromeos_root,
261                    self.benchmark.test_name,
262                    self.iteration,
263                    self.test_args,
264                    self.profiler_args,
265                    self.machine_manager,
266                    self.machine,
267                    self.label.board,
268                    self.cache_conditions,
269                    self._logger,
270                    self.log_level,
271                    self.label,
272                    self.share_cache,
273                    self.benchmark.suite,
274                    self.benchmark.show_all_results,
275                    self.benchmark.run_local
276                   )
277
278    self.result = self.cache.ReadResult()
279    self.cache_hit = (self.result is not None)
280
281
282  def RunTest(self, machine):
283    """Remove Result.CreateFromRun for testing."""
284    self.timeline.Record(STATUS_IMAGING)
285    self.machine_manager.ImageMachine(machine,
286                                      self.label)
287    self.timeline.Record(STATUS_RUNNING)
288    [retval, out, err] = self.suite_runner.Run(machine.name,
289                                                  self.label.chromeos_root,
290                                                  self.label.board,
291                                                  self.benchmark.test_name,
292                                                  self.test_args,
293                                                  self.profiler_args)
294    self.run_completed = True
295    rr = MockResult("logger", self.label, self.log_level)
296    rr.out = out
297    rr.err = err
298    rr.retval = retval
299    return rr
300