1#!/usr/bin/env python
2# Copyright (C) 2011 Google Inc. All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30"""
31The TestRunner2 package is an alternate implementation of the TestRunner
32class that uses the manager_worker_broker module to send sets of tests to
33workers and receive their completion messages accordingly.
34"""
35
36import logging
37import time
38
39from webkitpy.tool import grammar
40
41from webkitpy.layout_tests.layout_package import manager_worker_broker
42from webkitpy.layout_tests.layout_package import test_runner
43from webkitpy.layout_tests.layout_package import worker
44
45
46_log = logging.getLogger(__name__)
47
48
49class _WorkerState(object):
50    """A class for the TestRunner/manager to use to track the current state
51    of the workers."""
52    def __init__(self, number, worker_connection):
53        self.worker_connection = worker_connection
54        self.number = number
55        self.done = False
56        self.current_test_name = None
57        self.next_timeout = None
58        self.wedged = False
59        self.stats = {}
60        self.stats['name'] = worker_connection.name
61        self.stats['num_tests'] = 0
62        self.stats['total_time'] = 0
63
64    def __repr__(self):
65        return "_WorkerState(" + str(self.__dict__) + ")"
66
67
68class TestRunner2(test_runner.TestRunner):
69    def __init__(self, port, options, printer):
70        test_runner.TestRunner.__init__(self, port, options, printer)
71        self._all_results = []
72        self._group_stats = {}
73        self._current_result_summary = None
74
75        # This maps worker names to the state we are tracking for each of them.
76        self._worker_states = {}
77
78    def is_done(self):
79        worker_states = self._worker_states.values()
80        return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states)
81
82    def _worker_is_done(self, worker_state):
83        t = time.time()
84        if worker_state.done or worker_state.wedged:
85            return True
86
87        next_timeout = worker_state.next_timeout
88        WEDGE_PADDING = 40.0
89        if next_timeout and t > next_timeout + WEDGE_PADDING:
90            _log.error('')
91            worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name)
92            _log.error('')
93            worker_state.wedged = True
94            return True
95        return False
96
97    def name(self):
98        return 'TestRunner2'
99
100    def _run_tests(self, file_list, result_summary):
101        """Runs the tests in the file_list.
102
103        Return: A tuple (interrupted, keyboard_interrupted, thread_timings,
104            test_timings, individual_test_timings)
105            interrupted is whether the run was interrupted
106            keyboard_interrupted is whether someone typed Ctrl^C
107            thread_timings is a list of dicts with the total runtime
108              of each thread with 'name', 'num_tests', 'total_time' properties
109            test_timings is a list of timings for each sharded subdirectory
110              of the form [time, directory_name, num_tests]
111            individual_test_timings is a list of run times for each test
112              in the form {filename:filename, test_run_time:test_run_time}
113            result_summary: summary object to populate with the results
114        """
115        self._current_result_summary = result_summary
116        self._all_results = []
117        self._group_stats = {}
118        self._worker_states = {}
119
120        keyboard_interrupted = False
121        interrupted = False
122        thread_timings = []
123
124        self._printer.print_update('Sharding tests ...')
125        test_lists = self._shard_tests(file_list,
126            (int(self._options.child_processes) > 1) and not self._options.experimental_fully_parallel)
127
128        num_workers = self._num_workers(len(test_lists))
129
130        manager_connection = manager_worker_broker.get(self._port, self._options,
131                                                       self, worker.Worker)
132
133        if self._options.dry_run:
134            return (keyboard_interrupted, interrupted, thread_timings,
135                    self._group_stats, self._all_results)
136
137        self._printer.print_update('Starting %s ...' %
138                                   grammar.pluralize('worker', num_workers))
139        for worker_number in xrange(num_workers):
140            worker_connection = manager_connection.start_worker(worker_number)
141            worker_state = _WorkerState(worker_number, worker_connection)
142            self._worker_states[worker_connection.name] = worker_state
143
144            # FIXME: If we start workers up too quickly, DumpRenderTree appears
145            # to thrash on something and time out its first few tests. Until
146            # we can figure out what's going on, sleep a bit in between
147            # workers.
148            time.sleep(0.1)
149
150        self._printer.print_update("Starting testing ...")
151        for test_list in test_lists:
152            manager_connection.post_message('test_list', test_list[0], test_list[1])
153
154        # We post one 'stop' message for each worker. Because the stop message
155        # are sent after all of the tests, and because each worker will stop
156        # reading messsages after receiving a stop, we can be sure each
157        # worker will get a stop message and hence they will all shut down.
158        for i in xrange(num_workers):
159            manager_connection.post_message('stop')
160
161        try:
162            while not self.is_done():
163                # We loop with a timeout in order to be able to detect wedged threads.
164                manager_connection.run_message_loop(delay_secs=1.0)
165
166            if any(worker_state.wedged for worker_state in self._worker_states.values()):
167                _log.error('')
168                _log.error('Remaining workers are wedged, bailing out.')
169                _log.error('')
170            else:
171                _log.debug('No wedged threads')
172
173            # Make sure all of the workers have shut down (if possible).
174            for worker_state in self._worker_states.values():
175                if not worker_state.wedged and worker_state.worker_connection.is_alive():
176                    worker_state.worker_connection.join(0.5)
177                    assert not worker_state.worker_connection.is_alive()
178
179        except KeyboardInterrupt:
180            _log.info("Interrupted, exiting")
181            self.cancel_workers()
182            keyboard_interrupted = True
183        except test_runner.TestRunInterruptedException, e:
184            _log.info(e.reason)
185            self.cancel_workers()
186            interrupted = True
187        except:
188            # Unexpected exception; don't try to clean up workers.
189            _log.info("Exception raised, exiting")
190            raise
191
192        thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]
193
194        # FIXME: should this be a class instead of a tuple?
195        return (interrupted, keyboard_interrupted, thread_timings,
196                self._group_stats, self._all_results)
197
198    def cancel_workers(self):
199        for worker_state in self._worker_states.values():
200            worker_state.worker_connection.cancel()
201
202    def handle_started_test(self, source, test_info, hang_timeout):
203        worker_state = self._worker_states[source]
204        worker_state.current_test_name = self._port.relative_test_filename(test_info.filename)
205        worker_state.next_timeout = time.time() + hang_timeout
206
207    def handle_done(self, source):
208        worker_state = self._worker_states[source]
209        worker_state.done = True
210
211    def handle_exception(self, source, exception_info):
212        exception_type, exception_value, exception_traceback = exception_info
213        raise exception_type, exception_value, exception_traceback
214
215    def handle_finished_list(self, source, list_name, num_tests, elapsed_time):
216        self._group_stats[list_name] = (num_tests, elapsed_time)
217
218    def handle_finished_test(self, source, result, elapsed_time):
219        worker_state = self._worker_states[source]
220        worker_state.next_timeout = None
221        worker_state.current_test_name = None
222        worker_state.stats['total_time'] += elapsed_time
223        worker_state.stats['num_tests'] += 1
224
225        if worker_state.wedged:
226            # This shouldn't happen if we have our timeouts tuned properly.
227            _log.error("%s unwedged", source)
228
229        self._all_results.append(result)
230        self._update_summary_with_result(self._current_result_summary, result)
231