run.py revision 8d93aa0de330e24fe1cec7a915bdf1354da377fa
1import os 2import threading 3import time 4import traceback 5try: 6 import Queue as queue 7except ImportError: 8 import queue 9 10try: 11 import win32api 12except ImportError: 13 win32api = None 14 15try: 16 import multiprocessing 17except ImportError: 18 multiprocessing = None 19 20import lit.Test 21 22### 23# Test Execution Implementation 24 25class LockedValue(object): 26 def __init__(self, value): 27 self.lock = threading.Lock() 28 self._value = value 29 30 def _get_value(self): 31 self.lock.acquire() 32 try: 33 return self._value 34 finally: 35 self.lock.release() 36 37 def _set_value(self, value): 38 self.lock.acquire() 39 try: 40 self._value = value 41 finally: 42 self.lock.release() 43 44 value = property(_get_value, _set_value) 45 46class TestProvider(object): 47 def __init__(self, tests, num_jobs, queue_impl, canceled_flag): 48 self.canceled_flag = canceled_flag 49 50 # Create a shared queue to provide the test indices. 51 self.queue = queue_impl() 52 for i in range(len(tests)): 53 self.queue.put(i) 54 for i in range(num_jobs): 55 self.queue.put(None) 56 57 def cancel(self): 58 self.canceled_flag.value = 1 59 60 def get(self): 61 # Check if we are canceled. 62 if self.canceled_flag.value: 63 return None 64 65 # Otherwise take the next test. 66 return self.queue.get() 67 68class Tester(object): 69 def __init__(self, run_instance, provider, consumer): 70 self.run_instance = run_instance 71 self.provider = provider 72 self.consumer = consumer 73 74 def run(self): 75 while True: 76 item = self.provider.get() 77 if item is None: 78 break 79 self.run_test(item) 80 self.consumer.task_finished() 81 82 def run_test(self, test_index): 83 test = self.run_instance.tests[test_index] 84 try: 85 self.run_instance.execute_test(test) 86 except KeyboardInterrupt: 87 # This is a sad hack. Unfortunately subprocess goes 88 # bonkers with ctrl-c and we start forking merrily. 89 print('\nCtrl-C detected, goodbye.') 90 os.kill(0,9) 91 self.consumer.update(test_index, test) 92 93class ThreadResultsConsumer(object): 94 def __init__(self, display): 95 self.display = display 96 self.lock = threading.Lock() 97 98 def update(self, test_index, test): 99 self.lock.acquire() 100 try: 101 self.display.update(test) 102 finally: 103 self.lock.release() 104 105 def task_finished(self): 106 pass 107 108 def handle_results(self): 109 pass 110 111class MultiprocessResultsConsumer(object): 112 def __init__(self, run, display, num_jobs): 113 self.run = run 114 self.display = display 115 self.num_jobs = num_jobs 116 self.queue = multiprocessing.Queue() 117 118 def update(self, test_index, test): 119 # This method is called in the child processes, and communicates the 120 # results to the actual display implementation via an output queue. 121 self.queue.put((test_index, test.result)) 122 123 def task_finished(self): 124 # This method is called in the child processes, and communicates that 125 # individual tasks are complete. 126 self.queue.put(None) 127 128 def handle_results(self): 129 # This method is called in the parent, and consumes the results from the 130 # output queue and dispatches to the actual display. The method will 131 # complete after each of num_jobs tasks has signalled completion. 132 completed = 0 133 while completed != self.num_jobs: 134 # Wait for a result item. 135 item = self.queue.get() 136 if item is None: 137 completed += 1 138 continue 139 140 # Update the test result in the parent process. 141 index,result = item 142 test = self.run.tests[index] 143 test.result = result 144 145 self.display.update(test) 146 147def run_one_tester(run, provider, display): 148 tester = Tester(run, provider, display) 149 tester.run() 150 151### 152 153class Run(object): 154 """ 155 This class represents a concrete, configured testing run. 156 """ 157 158 def __init__(self, lit_config, tests): 159 self.lit_config = lit_config 160 self.tests = tests 161 162 def execute_test(self, test): 163 result = None 164 start_time = time.time() 165 try: 166 result = test.config.test_format.execute(test, self.lit_config) 167 168 # Support deprecated result from execute() which returned the result 169 # code and additional output as a tuple. 170 if isinstance(result, tuple): 171 code, output = result 172 result = lit.Test.Result(code, output) 173 elif not isinstance(result, lit.Test.Result): 174 raise ValueError("unexpected result from test execution") 175 except KeyboardInterrupt: 176 raise 177 except: 178 if self.lit_config.debug: 179 raise 180 output = 'Exception during script execution:\n' 181 output += traceback.format_exc() 182 output += '\n' 183 result = lit.Test.Result(lit.Test.UNRESOLVED, output) 184 result.elapsed = time.time() - start_time 185 186 test.setResult(result) 187 188 def execute_tests(self, display, jobs, max_time=None, 189 use_processes=False): 190 """ 191 execute_tests(display, jobs, [max_time]) 192 193 Execute each of the tests in the run, using up to jobs number of 194 parallel tasks, and inform the display of each individual result. The 195 provided tests should be a subset of the tests available in this run 196 object. 197 198 If max_time is non-None, it should be a time in seconds after which to 199 stop executing tests. 200 201 The display object will have its update method called with each test as 202 it is completed. The calls are guaranteed to be locked with respect to 203 one another, but are *not* guaranteed to be called on the same thread as 204 this method was invoked on. 205 206 Upon completion, each test in the run will have its result 207 computed. Tests which were not actually executed (for any reason) will 208 be given an UNRESOLVED result. 209 """ 210 211 # Choose the appropriate parallel execution implementation. 212 consumer = None 213 if jobs != 1 and use_processes and multiprocessing: 214 try: 215 task_impl = multiprocessing.Process 216 queue_impl = multiprocessing.Queue 217 canceled_flag = multiprocessing.Value('i', 0) 218 consumer = MultiprocessResultsConsumer(self, display, jobs) 219 except ImportError: 220 # Workaround for BSD: http://bugs.python.org/issue3770 221 consumer = None 222 if not consumer: 223 task_impl = threading.Thread 224 queue_impl = queue.Queue 225 canceled_flag = LockedValue(0) 226 consumer = ThreadResultsConsumer(display) 227 228 # Create the test provider. 229 provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag) 230 231 # Install a console-control signal handler on Windows. 232 if win32api is not None: 233 def console_ctrl_handler(type): 234 provider.cancel() 235 return True 236 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) 237 238 # Install a timeout handler, if requested. 239 if max_time is not None: 240 def timeout_handler(): 241 provider.cancel() 242 timeout_timer = threading.Timer(max_time, timeout_handler) 243 timeout_timer.start() 244 245 # If not using multiple tasks, just run the tests directly. 246 if jobs == 1: 247 run_one_tester(self, provider, consumer) 248 else: 249 # Otherwise, execute the tests in parallel 250 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) 251 252 # Cancel the timeout handler. 253 if max_time is not None: 254 timeout_timer.cancel() 255 256 # Update results for any tests which weren't run. 257 for test in self.tests: 258 if test.result is None: 259 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) 260 261 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): 262 # Start all of the tasks. 263 tasks = [task_impl(target=run_one_tester, 264 args=(self, provider, consumer)) 265 for i in range(jobs)] 266 for t in tasks: 267 t.start() 268 269 # Allow the consumer to handle results, if necessary. 270 consumer.handle_results() 271 272 # Wait for all the tasks to complete. 273 for t in tasks: 274 t.join() 275