1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Dispatches tests, either sharding or replicating them.
6
7To dispatch, performs the following steps:
8* Create a test collection factory, using the given tests
9  - If sharding: test collection factory returns the same shared test collection
10    to all test runners
11  - If replciating: test collection factory returns a unique test collection to
12    each test runner, with the same set of tests in each.
13* Get the list of devices to run on
14* Create test runners
15* Run each test runner in its own thread, pulling tests from the test collection
16  generated from the test collection factory until there are no tests left.
17"""
18
19import logging
20import threading
21
22from pylib import android_commands
23from pylib import constants
24from pylib.utils import reraiser_thread
25from pylib.utils import watchdog_timer
26
27import base_test_result
28
29
30DEFAULT_TIMEOUT = 7 * 60  # seven minutes
31
32
33class _ThreadSafeCounter(object):
34  """A threadsafe counter."""
35
36  def __init__(self):
37    self._lock = threading.Lock()
38    self._value = 0
39
40  def GetAndIncrement(self):
41    """Get the current value and increment it atomically.
42
43    Returns:
44      The value before incrementing.
45    """
46    with self._lock:
47      pre_increment = self._value
48      self._value += 1
49      return pre_increment
50
51
52class _Test(object):
53  """Holds a test with additional metadata."""
54
55  def __init__(self, test, tries=0):
56    """Initializes the _Test object.
57
58    Args:
59      test: The test.
60      tries: Number of tries so far.
61    """
62    self.test = test
63    self.tries = tries
64
65
66class _TestCollection(object):
67  """A threadsafe collection of tests.
68
69  Args:
70    tests: List of tests to put in the collection.
71  """
72
73  def __init__(self, tests=[]):
74    self._lock = threading.Lock()
75    self._tests = []
76    self._tests_in_progress = 0
77    # Used to signal that an item is avaliable or all items have been handled.
78    self._item_avaliable_or_all_done = threading.Event()
79    for t in tests:
80      self.add(t)
81
82  def _pop(self):
83    """Pop a test from the collection.
84
85    Waits until a test is avaliable or all tests have been handled.
86
87    Returns:
88      A test or None if all tests have been handled.
89    """
90    while True:
91      # Wait for a test to be avaliable or all tests to have been handled.
92      self._item_avaliable_or_all_done.wait()
93      with self._lock:
94        # Check which of the two conditions triggered the signal.
95        if self._tests_in_progress == 0:
96          return None
97        try:
98          return self._tests.pop(0)
99        except IndexError:
100          # Another thread beat us to the avaliable test, wait again.
101          self._item_avaliable_or_all_done.clear()
102
103  def add(self, test):
104    """Add an test to the collection.
105
106    Args:
107      test: A test to add.
108    """
109    with self._lock:
110      self._tests.append(test)
111      self._item_avaliable_or_all_done.set()
112      self._tests_in_progress += 1
113
114  def test_completed(self):
115    """Indicate that a test has been fully handled."""
116    with self._lock:
117      self._tests_in_progress -= 1
118      if self._tests_in_progress == 0:
119        # All tests have been handled, signal all waiting threads.
120        self._item_avaliable_or_all_done.set()
121
122  def __iter__(self):
123    """Iterate through tests in the collection until all have been handled."""
124    while True:
125      r = self._pop()
126      if r is None:
127        break
128      yield r
129
130
131def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
132                       num_retries, tag_results_with_device=False):
133  """Runs tests from the test_collection until empty using the given runner.
134
135  Adds TestRunResults objects to the out_results list and may add tests to the
136  out_retry list.
137
138  Args:
139    runner: A TestRunner object used to run the tests.
140    test_collection: A _TestCollection from which to get _Test objects to run.
141    out_results: A list to add TestRunResults to.
142    watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
143    num_retries: Number of retries for a test.
144    tag_results_with_device: If True, appends the name of the device on which
145        the test was run to the test name. Used when replicating to identify
146        which device ran each copy of the test, and to ensure each copy of the
147        test is recorded separately.
148  """
149
150  def TagTestRunResults(test_run_results):
151    """Tags all results with the last 4 digits of the device id.
152
153    Used when replicating tests to distinguish the same tests run on different
154    devices. We use a set to store test results, so the hash (generated from
155    name and tag) must be unique to be considered different results.
156    """
157    new_test_run_results = base_test_result.TestRunResults()
158    for test_result in test_run_results.GetAll():
159      test_result.SetName('%s_%s' % (runner.device[-4:], test_result.GetName()))
160      new_test_run_results.AddResult(test_result)
161    return new_test_run_results
162
163  for test in test_collection:
164    watcher.Reset()
165    try:
166      if not android_commands.IsDeviceAttached(runner.device):
167        # Device is unresponsive, stop handling tests on this device.
168        msg = 'Device %s is unresponsive.' % runner.device
169        logging.warning(msg)
170        raise android_commands.errors.DeviceUnresponsiveError(msg)
171      result, retry = runner.RunTest(test.test)
172      if tag_results_with_device:
173        result = TagTestRunResults(result)
174      test.tries += 1
175      if retry and test.tries <= num_retries:
176        # Retry non-passing results, only record passing results.
177        pass_results = base_test_result.TestRunResults()
178        pass_results.AddResults(result.GetPass())
179        out_results.append(pass_results)
180        logging.warning('Will retry test, try #%s.' % test.tries)
181        test_collection.add(_Test(test=retry, tries=test.tries))
182      else:
183        # All tests passed or retry limit reached. Either way, record results.
184        out_results.append(result)
185    except:
186      # An unhandleable exception, ensure tests get run by another device and
187      # reraise this exception on the main thread.
188      test_collection.add(test)
189      raise
190    finally:
191      # Retries count as separate tasks so always mark the popped test as done.
192      test_collection.test_completed()
193
194
195def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
196  """Creates a test runner for each device and calls SetUp() in parallel.
197
198  Note: if a device is unresponsive the corresponding TestRunner will not be
199    added to out_runners.
200
201  Args:
202    runner_factory: Callable that takes a device and index and returns a
203      TestRunner object.
204    device: The device serial number to set up.
205    out_runners: List to add the successfully set up TestRunner object.
206    threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
207  """
208  try:
209    index = threadsafe_counter.GetAndIncrement()
210    logging.warning('Creating shard %s for device %s.', index, device)
211    runner = runner_factory(device, index)
212    runner.SetUp()
213    out_runners.append(runner)
214  except android_commands.errors.DeviceUnresponsiveError as e:
215    logging.warning('Failed to create shard for %s: [%s]', device, e)
216
217
218def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
219                 tag_results_with_device=False):
220  """Run all tests using the given TestRunners.
221
222  Args:
223    runners: A list of TestRunner objects.
224    test_collection_factory: A callable to generate a _TestCollection object for
225        each test runner.
226    num_retries: Number of retries for a test.
227    timeout: Watchdog timeout in seconds.
228    tag_results_with_device: If True, appends the name of the device on which
229        the test was run to the test name. Used when replicating to identify
230        which device ran each copy of the test, and to ensure each copy of the
231        test is recorded separately.
232
233  Returns:
234    A tuple of (TestRunResults object, exit code)
235  """
236  logging.warning('Running tests with %s test runners.' % (len(runners)))
237  results = []
238  exit_code = 0
239  watcher = watchdog_timer.WatchdogTimer(timeout)
240
241  workers = reraiser_thread.ReraiserThreadGroup(
242      [reraiser_thread.ReraiserThread(
243          _RunTestsFromQueue,
244          [r, test_collection_factory(), results, watcher, num_retries,
245           tag_results_with_device],
246          name=r.device[-4:])
247       for r in runners])
248  run_results = base_test_result.TestRunResults()
249  workers.StartAll()
250
251  # Catch DeviceUnresponsiveErrors and set a warning exit code
252  try:
253    workers.JoinAll(watcher)
254  except android_commands.errors.DeviceUnresponsiveError as e:
255    logging.error(e)
256    exit_code = constants.WARNING_EXIT_CODE
257
258  for r in results:
259    run_results.AddTestRunResults(r)
260  if not run_results.DidRunPass():
261    exit_code = constants.ERROR_EXIT_CODE
262  return (run_results, exit_code)
263
264
265def _CreateRunners(runner_factory, devices, timeout=None):
266  """Creates a test runner for each device and calls SetUp() in parallel.
267
268  Note: if a device is unresponsive the corresponding TestRunner will not be
269    included in the returned list.
270
271  Args:
272    runner_factory: Callable that takes a device and index and returns a
273      TestRunner object.
274    devices: List of device serial numbers as strings.
275    timeout: Watchdog timeout in seconds, defaults to the default timeout.
276
277  Returns:
278    A list of TestRunner objects.
279  """
280  logging.warning('Creating %s test runners.' % len(devices))
281  runners = []
282  counter = _ThreadSafeCounter()
283  threads = reraiser_thread.ReraiserThreadGroup(
284      [reraiser_thread.ReraiserThread(_SetUp,
285                                      [runner_factory, d, runners, counter],
286                                      name=d[-4:])
287       for d in devices])
288  threads.StartAll()
289  threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
290  return runners
291
292
293def _TearDownRunners(runners, timeout=None):
294  """Calls TearDown() for each test runner in parallel.
295
296  Args:
297    runners: A list of TestRunner objects.
298    timeout: Watchdog timeout in seconds, defaults to the default timeout.
299  """
300  threads = reraiser_thread.ReraiserThreadGroup(
301      [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
302       for r in runners])
303  threads.StartAll()
304  threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
305
306
307
308def _GetAttachedDevices(wait_for_debugger=False, test_device=None):
309  """Get all attached devices.
310
311  If we are using a debugger, limit to only one device.
312
313  Args:
314    wait_for_debugger: True if this run will use a debugger.
315    test_device: Name of a specific device to use.
316
317  Returns:
318    A list of attached devices.
319  """
320  attached_devices = []
321
322  attached_devices = android_commands.GetAttachedDevices()
323  if test_device:
324    assert test_device in attached_devices, (
325        'Did not find device %s among attached device. Attached devices: %s'
326        % (test_device, ', '.join(attached_devices)))
327    attached_devices = [test_device]
328
329  if len(attached_devices) > 1 and wait_for_debugger:
330    logging.warning('Debugger can not be sharded, using first available device')
331    attached_devices = attached_devices[:1]
332
333  return attached_devices
334
335
336def RunTests(tests, runner_factory, wait_for_debugger, test_device,
337             shard=True,
338             build_type='Debug',
339             test_timeout=DEFAULT_TIMEOUT,
340             setup_timeout=DEFAULT_TIMEOUT,
341             num_retries=2):
342  """Run all tests on attached devices, retrying tests that don't pass.
343
344  Args:
345    tests: List of tests to run.
346    runner_factory: Callable that takes a device and index and returns a
347        TestRunner object.
348    wait_for_debugger: True if this test is using a debugger.
349    test_device: A specific device to run tests on, or None.
350    shard: True if we should shard, False if we should replicate tests.
351      - Sharding tests will distribute tests across all test runners through a
352        shared test collection.
353      - Replicating tests will copy all tests to each test runner through a
354        unique test collection for each test runner.
355    build_type: Either 'Debug' or 'Release'.
356    test_timeout: Watchdog timeout in seconds for running tests.
357    setup_timeout: Watchdog timeout in seconds for creating and cleaning up
358        test runners.
359    num_retries: Number of retries for a test.
360
361  Returns:
362    A tuple of (base_test_result.TestRunResults object, exit code).
363  """
364  if not tests:
365    logging.error('No tests to run.')
366    return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
367
368  if shard:
369    # Generate a shared _TestCollection object for all test runners, so they
370    # draw from a common pool of tests.
371    shared_test_collection = _TestCollection([_Test(t) for t in tests])
372    test_collection_factory = lambda: shared_test_collection
373    tag_results_with_device = False
374    log_string = 'sharded across devices'
375  else:
376    # Generate a unique _TestCollection object for each test runner, but use
377    # the same set of tests.
378    test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
379    tag_results_with_device = True
380    log_string = 'replicated on each device'
381
382  devices = _GetAttachedDevices(wait_for_debugger, test_device)
383
384  logging.info('Will run %d tests (%s): %s', len(tests), log_string, str(tests))
385  runners = _CreateRunners(runner_factory, devices, setup_timeout)
386  try:
387    return _RunAllTests(runners, test_collection_factory,
388                        num_retries, test_timeout, tag_results_with_device)
389  finally:
390    try:
391      _TearDownRunners(runners, setup_timeout)
392    except android_commands.errors.DeviceUnresponsiveError as e:
393      logging.warning('Device unresponsive during TearDown: [%s]', e)
394