1# Copyright (C) 2011 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import logging
30import math
31import threading
32import time
33
34from webkitpy.common import message_pool
35from webkitpy.layout_tests.controllers import single_test_runner
36from webkitpy.layout_tests.models.test_run_results import TestRunResults
37from webkitpy.layout_tests.models import test_expectations
38from webkitpy.layout_tests.models import test_failures
39from webkitpy.layout_tests.models import test_results
40from webkitpy.tool import grammar
41
42
43_log = logging.getLogger(__name__)
44
45
46TestExpectations = test_expectations.TestExpectations
47
48# Export this so callers don't need to know about message pools.
49WorkerException = message_pool.WorkerException
50
51
52class TestRunInterruptedException(Exception):
53    """Raised when a test run should be stopped immediately."""
54    def __init__(self, reason):
55        Exception.__init__(self)
56        self.reason = reason
57        self.msg = reason
58
59    def __reduce__(self):
60        return self.__class__, (self.reason,)
61
62
63class LayoutTestRunner(object):
64    def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
65        self._options = options
66        self._port = port
67        self._printer = printer
68        self._results_directory = results_directory
69        self._test_is_slow = test_is_slow_fn
70        self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
71        self._filesystem = self._port.host.filesystem
72
73        self._expectations = None
74        self._test_inputs = []
75        self._retrying = False
76
77        self._current_run_results = None
78
79    def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
80        self._expectations = expectations
81        self._test_inputs = test_inputs
82        self._retrying = retrying
83        self._shards_to_redo = []
84
85        # FIXME: rename all variables to test_run_results or some such ...
86        run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
87        self._current_run_results = run_results
88        self._printer.num_tests = len(test_inputs)
89        self._printer.num_completed = 0
90
91        if not retrying:
92            self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
93
94        for test_name in set(tests_to_skip):
95            result = test_results.TestResult(test_name)
96            result.type = test_expectations.SKIP
97            run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
98
99        self._printer.write_update('Sharding tests ...')
100        locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs,
101            int(self._options.child_processes), self._options.fully_parallel,
102            self._options.run_singly or (self._options.batch_size == 1))
103
104        # We don't have a good way to coordinate the workers so that they don't
105        # try to run the shards that need a lock. The easiest solution is to
106        # run all of the locked shards first.
107        all_shards = locked_shards + unlocked_shards
108        num_workers = min(num_workers, len(all_shards))
109        self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
110
111        if self._options.dry_run:
112            return run_results
113
114        self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
115
116        start_time = time.time()
117        try:
118            with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
119                pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
120
121            if self._shards_to_redo:
122                num_workers -= len(self._shards_to_redo)
123                if num_workers > 0:
124                    with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
125                        pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo)
126        except TestRunInterruptedException, e:
127            _log.warning(e.reason)
128            run_results.interrupted = True
129        except KeyboardInterrupt:
130            self._printer.flush()
131            self._printer.writeln('Interrupted, exiting ...')
132            run_results.keyboard_interrupted = True
133        except Exception, e:
134            _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
135            raise
136        finally:
137            run_results.run_time = time.time() - start_time
138
139        return run_results
140
141    def _worker_factory(self, worker_connection):
142        results_directory = self._results_directory
143        if self._retrying:
144            self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
145            results_directory = self._filesystem.join(self._results_directory, 'retries')
146        return Worker(worker_connection, results_directory, self._options)
147
148    def _mark_interrupted_tests_as_skipped(self, run_results):
149        for test_input in self._test_inputs:
150            if test_input.test_name not in run_results.results_by_name:
151                result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
152                # FIXME: We probably need to loop here if there are multiple iterations.
153                # FIXME: Also, these results are really neither expected nor unexpected. We probably
154                # need a third type of result.
155                run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
156
157    def _interrupt_if_at_failure_limits(self, run_results):
158        # Note: The messages in this method are constructed to match old-run-webkit-tests
159        # so that existing buildbot grep rules work.
160        def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
161            if limit and failure_count >= limit:
162                message += " %d tests run." % (run_results.expected + run_results.unexpected)
163                self._mark_interrupted_tests_as_skipped(run_results)
164                raise TestRunInterruptedException(message)
165
166        interrupt_if_at_failure_limit(
167            self._options.exit_after_n_failures,
168            run_results.unexpected_failures,
169            run_results,
170            "Exiting early after %d failures." % run_results.unexpected_failures)
171        interrupt_if_at_failure_limit(
172            self._options.exit_after_n_crashes_or_timeouts,
173            run_results.unexpected_crashes + run_results.unexpected_timeouts,
174            run_results,
175            # This differs from ORWT because it does not include WebProcess crashes.
176            "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
177
178    def _update_summary_with_result(self, run_results, result):
179        expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type, self._options.enable_sanitizer)
180        exp_str = self._expectations.get_expectations_string(result.test_name)
181        got_str = self._expectations.expectation_to_string(result.type)
182
183        if result.device_failed:
184            self._printer.print_finished_test(result, False, exp_str, "Aborted")
185            return
186
187        run_results.add(result, expected, self._test_is_slow(result.test_name))
188        self._printer.print_finished_test(result, expected, exp_str, got_str)
189        self._interrupt_if_at_failure_limits(run_results)
190
191    def handle(self, name, source, *args):
192        method = getattr(self, '_handle_' + name)
193        if method:
194            return method(source, *args)
195        raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
196
197    def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
198        self._printer.print_started_test(test_input.test_name)
199
200    def _handle_finished_test_list(self, worker_name, list_name):
201        pass
202
203    def _handle_finished_test(self, worker_name, result, log_messages=[]):
204        self._update_summary_with_result(self._current_run_results, result)
205
206    def _handle_device_failed(self, worker_name, list_name, remaining_tests):
207        _log.warning("%s has failed" % worker_name)
208        if remaining_tests:
209            self._shards_to_redo.append(TestShard(list_name, remaining_tests))
210
211class Worker(object):
212    def __init__(self, caller, results_directory, options):
213        self._caller = caller
214        self._worker_number = caller.worker_number
215        self._name = caller.name
216        self._results_directory = results_directory
217        self._options = options
218
219        # The remaining fields are initialized in start()
220        self._host = None
221        self._port = None
222        self._batch_size = None
223        self._batch_count = None
224        self._filesystem = None
225        self._driver = None
226        self._num_tests = 0
227
228    def __del__(self):
229        self.stop()
230
231    def start(self):
232        """This method is called when the object is starting to be used and it is safe
233        for the object to create state that does not need to be pickled (usually this means
234        it is called in a child process)."""
235        self._host = self._caller.host
236        self._filesystem = self._host.filesystem
237        self._port = self._host.port_factory.get(self._options.platform, self._options)
238
239        self._batch_count = 0
240        self._batch_size = self._options.batch_size or 0
241
242    def handle(self, name, source, test_list_name, test_inputs):
243        assert name == 'test_list'
244        for i, test_input in enumerate(test_inputs):
245            device_failed = self._run_test(test_input, test_list_name)
246            if device_failed:
247                self._caller.post('device_failed', test_list_name, test_inputs[i:])
248                self._caller.stop_running()
249                return
250
251        self._caller.post('finished_test_list', test_list_name)
252
253    def _update_test_input(self, test_input):
254        if test_input.reference_files is None:
255            # Lazy initialization.
256            test_input.reference_files = self._port.reference_files(test_input.test_name)
257        if test_input.reference_files:
258            test_input.should_run_pixel_test = True
259        else:
260            test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
261
262    def _run_test(self, test_input, shard_name):
263        self._batch_count += 1
264
265        stop_when_done = False
266        if self._batch_size > 0 and self._batch_count >= self._batch_size:
267            self._batch_count = 0
268            stop_when_done = True
269
270        self._update_test_input(test_input)
271        test_timeout_sec = self._timeout(test_input)
272        start = time.time()
273        device_failed = False
274
275        if self._driver and self._driver.has_crashed():
276            self._kill_driver()
277        if not self._driver:
278            self._driver = self._port.create_driver(self._worker_number)
279
280        if not self._driver:
281            # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create
282            # a new failure type?
283            device_failed = True
284            return device_failed
285
286        self._caller.post('started_test', test_input, test_timeout_sec)
287        result = single_test_runner.run_single_test(self._port, self._options, self._results_directory,
288            self._name, self._driver, test_input, stop_when_done)
289
290        result.shard_name = shard_name
291        result.worker_name = self._name
292        result.total_run_time = time.time() - start
293        result.test_number = self._num_tests
294        self._num_tests += 1
295        self._caller.post('finished_test', result)
296        self._clean_up_after_test(test_input, result)
297        return result.device_failed
298
299    def stop(self):
300        _log.debug("%s cleaning up" % self._name)
301        self._kill_driver()
302
303    def _timeout(self, test_input):
304        """Compute the appropriate timeout value for a test."""
305        # The driver watchdog uses 2.5x the timeout; we want to be
306        # larger than that. We also add a little more padding if we're
307        # running tests in a separate thread.
308        #
309        # Note that we need to convert the test timeout from a
310        # string value in milliseconds to a float for Python.
311
312        # FIXME: Can we just return the test_input.timeout now?
313        driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
314
315    def _kill_driver(self):
316        # Be careful about how and when we kill the driver; if driver.stop()
317        # raises an exception, this routine may get re-entered via __del__.
318        driver = self._driver
319        self._driver = None
320        if driver:
321            _log.debug("%s killing driver" % self._name)
322            driver.stop()
323
324
325    def _clean_up_after_test(self, test_input, result):
326        test_name = test_input.test_name
327
328        if result.failures:
329            # Check and kill the driver if we need to.
330            if any([f.driver_needs_restart() for f in result.failures]):
331                self._kill_driver()
332                # Reset the batch count since the shell just bounced.
333                self._batch_count = 0
334
335            # Print the error message(s).
336            _log.debug("%s %s failed:" % (self._name, test_name))
337            for f in result.failures:
338                _log.debug("%s  %s" % (self._name, f.message()))
339        elif result.type == test_expectations.SKIP:
340            _log.debug("%s %s skipped" % (self._name, test_name))
341        else:
342            _log.debug("%s %s passed" % (self._name, test_name))
343
344
345class TestShard(object):
346    """A test shard is a named list of TestInputs."""
347
348    def __init__(self, name, test_inputs):
349        self.name = name
350        self.test_inputs = test_inputs
351        self.requires_lock = test_inputs[0].requires_lock
352
353    def __repr__(self):
354        return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
355
356    def __eq__(self, other):
357        return self.name == other.name and self.test_inputs == other.test_inputs
358
359
360class Sharder(object):
361    def __init__(self, test_split_fn, max_locked_shards):
362        self._split = test_split_fn
363        self._max_locked_shards = max_locked_shards
364
365    def shard_tests(self, test_inputs, num_workers, fully_parallel, run_singly):
366        """Groups tests into batches.
367        This helps ensure that tests that depend on each other (aka bad tests!)
368        continue to run together as most cross-tests dependencies tend to
369        occur within the same directory.
370        Return:
371            Two list of TestShards. The first contains tests that must only be
372            run under the server lock, the second can be run whenever.
373        """
374
375        # FIXME: Move all of the sharding logic out of manager into its
376        # own class or module. Consider grouping it with the chunking logic
377        # in prepare_lists as well.
378        if num_workers == 1:
379            return self._shard_in_two(test_inputs)
380        elif fully_parallel:
381            return self._shard_every_file(test_inputs, run_singly)
382        return self._shard_by_directory(test_inputs)
383
384    def _shard_in_two(self, test_inputs):
385        """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
386
387        This is used when there's only one worker, to minimize the per-shard overhead."""
388        locked_inputs = []
389        unlocked_inputs = []
390        for test_input in test_inputs:
391            if test_input.requires_lock:
392                locked_inputs.append(test_input)
393            else:
394                unlocked_inputs.append(test_input)
395
396        locked_shards = []
397        unlocked_shards = []
398        if locked_inputs:
399            locked_shards = [TestShard('locked_tests', locked_inputs)]
400        if unlocked_inputs:
401            unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
402
403        return locked_shards, unlocked_shards
404
405    def _shard_every_file(self, test_inputs, run_singly):
406        """Returns two lists of shards, each shard containing a single test file.
407
408        This mode gets maximal parallelism at the cost of much higher flakiness."""
409        locked_shards = []
410        unlocked_shards = []
411        virtual_inputs = []
412
413        for test_input in test_inputs:
414            # Note that we use a '.' for the shard name; the name doesn't really
415            # matter, and the only other meaningful value would be the filename,
416            # which would be really redundant.
417            if test_input.requires_lock:
418                locked_shards.append(TestShard('.', [test_input]))
419            elif test_input.test_name.startswith('virtual') and not run_singly:
420                # This violates the spirit of sharding every file, but in practice, since the
421                # virtual test suites require a different commandline flag and thus a restart
422                # of content_shell, it's too slow to shard them fully.
423                virtual_inputs.append(test_input)
424            else:
425                unlocked_shards.append(TestShard('.', [test_input]))
426
427        locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs)
428
429        # The locked shards still need to be limited to self._max_locked_shards in order to not
430        # overload the http server for the http tests.
431        return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'),
432            unlocked_virtual_shards + unlocked_shards)
433
434    def _shard_by_directory(self, test_inputs):
435        """Returns two lists of shards, each shard containing all the files in a directory.
436
437        This is the default mode, and gets as much parallelism as we can while
438        minimizing flakiness caused by inter-test dependencies."""
439        locked_shards = []
440        unlocked_shards = []
441        unlocked_slow_shards = []
442        tests_by_dir = {}
443        # FIXME: Given that the tests are already sorted by directory,
444        # we can probably rewrite this to be clearer and faster.
445        for test_input in test_inputs:
446            directory = self._split(test_input.test_name)[0]
447            tests_by_dir.setdefault(directory, [])
448            tests_by_dir[directory].append(test_input)
449
450        for directory, test_inputs in tests_by_dir.iteritems():
451            shard = TestShard(directory, test_inputs)
452            if test_inputs[0].requires_lock:
453                locked_shards.append(shard)
454            # In practice, virtual test suites are slow to run. It's a bit hacky, but
455            # put them first since they're the long-tail of test runtime.
456            elif directory.startswith('virtual'):
457                unlocked_slow_shards.append(shard)
458            else:
459                unlocked_shards.append(shard)
460
461        # Sort the shards by directory name.
462        locked_shards.sort(key=lambda shard: shard.name)
463        unlocked_slow_shards.sort(key=lambda shard: shard.name)
464        unlocked_shards.sort(key=lambda shard: shard.name)
465
466        # Put a ceiling on the number of locked shards, so that we
467        # don't hammer the servers too badly.
468
469        # FIXME: For now, limit to one shard or set it
470        # with the --max-locked-shards. After testing to make sure we
471        # can handle multiple shards, we should probably do something like
472        # limit this to no more than a quarter of all workers, e.g.:
473        # return max(math.ceil(num_workers / 4.0), 1)
474        return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
475                unlocked_slow_shards + unlocked_shards)
476
477    def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
478        """Takes a list of shards and redistributes the tests into no more
479        than |max_new_shards| new shards."""
480
481        # This implementation assumes that each input shard only contains tests from a
482        # single directory, and that tests in each shard must remain together; as a
483        # result, a given input shard is never split between output shards.
484        #
485        # Each output shard contains the tests from one or more input shards and
486        # hence may contain tests from multiple directories.
487
488        def divide_and_round_up(numerator, divisor):
489            return int(math.ceil(float(numerator) / divisor))
490
491        def extract_and_flatten(shards):
492            test_inputs = []
493            for shard in shards:
494                test_inputs.extend(shard.test_inputs)
495            return test_inputs
496
497        def split_at(seq, index):
498            return (seq[:index], seq[index:])
499
500        num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
501        new_shards = []
502        remaining_shards = old_shards
503        while remaining_shards:
504            some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
505            new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
506        return new_shards
507