benchmark_run.py revision ba64928c5dcbacbc70b4358881a89ad96227164d
1#!/usr/bin/python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import datetime
8import os
9import threading
10import time
11import traceback
12
13from utils import command_executer
14from utils import timeline
15
16from machine_manager import NonMatchingMachines
17from suite_runner import SuiteRunner
18from results_cache import MockResult
19from results_cache import MockResultsCache
20from results_cache import Result
21from results_cache import ResultsCache
22from results_cache import TelemetryResult
23
24
25STATUS_FAILED = "FAILED"
26STATUS_SUCCEEDED = "SUCCEEDED"
27STATUS_IMAGING = "IMAGING"
28STATUS_RUNNING = "RUNNING"
29STATUS_WAITING = "WAITING"
30STATUS_PENDING = "PENDING"
31
32class BenchmarkRun(threading.Thread):
33  def __init__(self, name, benchmark,
34               label,
35               iteration,
36               cache_conditions,
37               machine_manager,
38               logger_to_use,
39               log_level,
40               share_cache):
41    threading.Thread.__init__(self)
42    self.name = name
43    self._logger = logger_to_use
44    self.log_level = log_level
45    self.benchmark = benchmark
46    self.iteration = iteration
47    self.label = label
48    self.result = None
49    self.terminated = False
50    self.retval = None
51    self.run_completed = False
52    self.machine_manager = machine_manager
53    self.suite_runner = SuiteRunner(self._logger, self.log_level)
54    self.machine = None
55    self.cache_conditions = cache_conditions
56    self.runs_complete = 0
57    self.cache_hit = False
58    self.failure_reason = ""
59    self.test_args = benchmark.test_args
60    self.profiler_args = self._GetExtraAutotestArgs()
61    self._ce = command_executer.GetCommandExecuter(self._logger,
62                                                   log_level=self.log_level)
63    self.timeline = timeline.Timeline()
64    self.timeline.Record(STATUS_PENDING)
65    self.share_cache = share_cache
66
67    # This is used by schedv2.
68    self.owner_thread = None
69
70  def ReadCache(self):
71    # Just use the first machine for running the cached version,
72    # without locking it.
73    self.cache = ResultsCache()
74    self.cache.Init(self.label.chromeos_image,
75                    self.label.chromeos_root,
76                    self.benchmark.test_name,
77                    self.iteration,
78                    self.test_args,
79                    self.profiler_args,
80                    self.machine_manager,
81                    self.label.board,
82                    self.cache_conditions,
83                    self._logger,
84                    self.log_level,
85                    self.label,
86                    self.share_cache,
87                    self.benchmark.suite,
88                    self.benchmark.show_all_results,
89                    self.benchmark.run_local
90                   )
91
92    self.result = self.cache.ReadResult()
93    self.cache_hit = (self.result is not None)
94
95  def run(self):
96    try:
97      self.ReadCache()
98
99      if self.result:
100        self._logger.LogOutput("%s: Cache hit." % self.name)
101        self._logger.LogOutput(self.result.out, print_to_console=False)
102        self._logger.LogError(self.result.err, print_to_console=False)
103
104      elif self.label.cache_only:
105        self._logger.LogOutput("%s: No cache hit." % self.name)
106        output = "%s: No Cache hit." % self.name
107        retval = 1
108        err = "No cache hit."
109        self.result = Result.CreateFromRun(self._logger, self.log_level,
110                                           self.label,
111                                           output, err, retval,
112                                           self.benchmark.show_all_results,
113                                           self.benchmark.test_name,
114                                           self.benchmark.suite)
115
116      else:
117        self._logger.LogOutput("%s: No cache hit." % self.name)
118        self.timeline.Record(STATUS_WAITING)
119        # Try to acquire a machine now.
120        self.machine = self.AcquireMachine()
121        self.result = self.RunTest(self.machine)
122
123        self.cache.remote = self.machine.name
124        self.cache.StoreResult(self.result)
125
126      if self.terminated:
127        return
128
129      if not self.result.retval:
130        self.timeline.Record(STATUS_SUCCEEDED)
131      else:
132        if self.timeline.GetLastEvent() != STATUS_FAILED:
133          self.failure_reason = "Return value of test suite was non-zero."
134          self.timeline.Record(STATUS_FAILED)
135
136    except Exception, e:
137      self._logger.LogError("Benchmark run: '%s' failed: %s" % (self.name, e))
138      traceback.print_exc()
139      if self.timeline.GetLastEvent() != STATUS_FAILED:
140        self.timeline.Record(STATUS_FAILED)
141        self.failure_reason = str(e)
142    finally:
143      if self.owner_thread is not None:
144        # In schedv2 mode, we do not lock machine locally. So noop here.
145        pass
146      elif self.machine:
147        if not self.machine.IsReachable():
148          self._logger.LogOutput("Machine %s is not reachable, removing it."
149                                 % self.machine.name)
150          self.machine_manager.RemoveMachine(self.machine.name)
151        self._logger.LogOutput("Releasing machine: %s" % self.machine.name)
152        self.machine_manager.ReleaseMachine(self.machine)
153        self._logger.LogOutput("Released machine: %s" % self.machine.name)
154
155  def Terminate(self):
156    self.terminated = True
157    self.suite_runner.Terminate()
158    if self.timeline.GetLastEvent() != STATUS_FAILED:
159      self.timeline.Record(STATUS_FAILED)
160      self.failure_reason = "Thread terminated."
161
162  def AcquireMachine(self):
163    if self.owner_thread is not None:
164      # No need to lock machine locally, DutWorker, which is a thread, is
165      # responsible for running br.
166      return self.owner_thread.dut()
167    while True:
168      machine = None
169      if self.terminated:
170        raise Exception("Thread terminated while trying to acquire machine.")
171      try:
172        machine = self.machine_manager.AcquireMachine(self.label.chromeos_image,
173                                                      self.label,
174                                                      throw=True)
175
176      except NonMatchingMachines:
177        self.machine_manager.ForceSameImageToAllMachines(self.label)
178        machine = self.machine_manager.AcquireMachine(self.label.chromeos_image,
179                                                      self.label,
180                                                      throw=False)
181
182      if machine:
183        self._logger.LogOutput("%s: Machine %s acquired at %s" %
184                               (self.name,
185                                machine.name,
186                                datetime.datetime.now()))
187        break
188      else:
189        sleep_duration = 10
190        time.sleep(sleep_duration)
191    return machine
192
193  def _GetExtraAutotestArgs(self):
194    if self.benchmark.perf_args and self.benchmark.suite == "telemetry":
195      self._logger.LogError("Telemetry does not support profiler.")
196      self.benchmark.perf_args = ""
197
198    if self.benchmark.perf_args and self.benchmark.suite == "test_that":
199      self._logger.LogError("test_that does not support profiler.")
200      self.benchmark.perf_args = ""
201
202    if self.benchmark.perf_args:
203      perf_args_list = self.benchmark.perf_args.split(" ")
204      perf_args_list = [perf_args_list[0]] + ["-a"] + perf_args_list[1:]
205      perf_args = " ".join(perf_args_list)
206      if not perf_args_list[0] in ["record", "stat"]:
207        raise Exception("perf_args must start with either record or stat")
208      extra_test_args = ["--profiler=custom_perf",
209                             ("--profiler_args='perf_options=\"%s\"'" %
210                              perf_args)]
211      return " ".join(extra_test_args)
212    else:
213      return ""
214
215  def RunTest(self, machine):
216    self.timeline.Record(STATUS_IMAGING)
217    if self.owner_thread is not None:
218      # In schedv2 mode, do not even call ImageMachine. Machine image is
219      # guarenteed.
220      pass
221    else:
222      self.machine_manager.ImageMachine(machine,
223                                        self.label)
224    self.timeline.Record(STATUS_RUNNING)
225    [retval, out, err] = self.suite_runner.Run(machine.name,
226                                                  self.label,
227                                                  self.benchmark,
228                                                  self.test_args,
229                                                  self.profiler_args)
230    self.run_completed = True
231    return Result.CreateFromRun(self._logger,
232                                self.log_level,
233                                self.label,
234                                out,
235                                err,
236                                retval,
237                                self.benchmark.show_all_results,
238                                self.benchmark.test_name,
239                                self.benchmark.suite)
240
241  def SetCacheConditions(self, cache_conditions):
242    self.cache_conditions = cache_conditions
243
244  def __str__(self):
245    """For better debugging."""
246
247    return 'BenchmarkRun[name="{}"]'.format(self.name)
248
249
250class MockBenchmarkRun(BenchmarkRun):
251  """Inherited from BenchmarkRuna."""
252
253  def ReadCache(self):
254    # Just use the first machine for running the cached version,
255    # without locking it.
256    self.cache = MockResultsCache()
257    self.cache.Init(self.label.chromeos_image,
258                    self.label.chromeos_root,
259                    self.benchmark.test_name,
260                    self.iteration,
261                    self.test_args,
262                    self.profiler_args,
263                    self.machine_manager,
264                    self.label.board,
265                    self.cache_conditions,
266                    self._logger,
267                    self.log_level,
268                    self.label,
269                    self.share_cache,
270                    self.benchmark.suite,
271                    self.benchmark.show_all_results,
272                    self.benchmark.run_local
273                   )
274
275    self.result = self.cache.ReadResult()
276    self.cache_hit = (self.result is not None)
277
278
279  def RunTest(self, machine):
280    """Remove Result.CreateFromRun for testing."""
281    self.timeline.Record(STATUS_IMAGING)
282    self.machine_manager.ImageMachine(machine,
283                                      self.label)
284    self.timeline.Record(STATUS_RUNNING)
285    [retval, out, err] = self.suite_runner.Run(machine.name,
286                                                  self.label.chromeos_root,
287                                                  self.label.board,
288                                                  self.benchmark.test_name,
289                                                  self.test_args,
290                                                  self.profiler_args)
291    self.run_completed = True
292    rr = MockResult("logger", self.label, self.log_level)
293    rr.out = out
294    rr.err = err
295    rr.retval = retval
296    return rr
297