1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Thread and ThreadGroup that reraise exceptions on the main thread.""" 6 7import logging 8import sys 9import threading 10import time 11import traceback 12 13import watchdog_timer 14 15 16class TimeoutError(Exception): 17 """Module-specific timeout exception.""" 18 pass 19 20 21def LogThreadStack(thread): 22 """Log the stack for the given thread. 23 24 Args: 25 thread: a threading.Thread instance. 26 """ 27 stack = sys._current_frames()[thread.ident] 28 logging.critical('*' * 80) 29 logging.critical('Stack dump for thread \'%s\'', thread.name) 30 logging.critical('*' * 80) 31 for filename, lineno, name, line in traceback.extract_stack(stack): 32 logging.critical('File: "%s", line %d, in %s', filename, lineno, name) 33 if line: 34 logging.critical(' %s', line.strip()) 35 logging.critical('*' * 80) 36 37 38class ReraiserThread(threading.Thread): 39 """Thread class that can reraise exceptions.""" 40 41 def __init__(self, func, args=[], kwargs={}, name=None): 42 """Initialize thread. 43 44 Args: 45 func: callable to call on a new thread. 46 args: list of positional arguments for callable, defaults to empty. 47 kwargs: dictionary of keyword arguments for callable, defaults to empty. 48 name: thread name, defaults to Thread-N. 49 """ 50 super(ReraiserThread, self).__init__(name=name) 51 self.daemon = True 52 self._func = func 53 self._args = args 54 self._kwargs = kwargs 55 self._exc_info = None 56 57 def ReraiseIfException(self): 58 """Reraise exception if an exception was raised in the thread.""" 59 if self._exc_info: 60 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] 61 62 #override 63 def run(self): 64 """Overrides Thread.run() to add support for reraising exceptions.""" 65 try: 66 self._func(*self._args, **self._kwargs) 67 except: 68 self._exc_info = sys.exc_info() 69 raise 70 71 72class ReraiserThreadGroup(object): 73 """A group of ReraiserThread objects.""" 74 75 def __init__(self, threads=[]): 76 """Initialize thread group. 77 78 Args: 79 threads: a list of ReraiserThread objects; defaults to empty. 80 """ 81 self._threads = threads 82 83 def Add(self, thread): 84 """Add a thread to the group. 85 86 Args: 87 thread: a ReraiserThread object. 88 """ 89 self._threads.append(thread) 90 91 def StartAll(self): 92 """Start all threads.""" 93 for thread in self._threads: 94 thread.start() 95 96 def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 97 """Join all threads without stack dumps. 98 99 Reraises exceptions raised by the child threads and supports breaking 100 immediately on exceptions raised on the main thread. 101 102 Args: 103 watcher: Watchdog object providing timeout, by default waits forever. 104 """ 105 alive_threads = self._threads[:] 106 while alive_threads: 107 for thread in alive_threads[:]: 108 if watcher.IsTimedOut(): 109 raise TimeoutError('Timed out waiting for %d of %d threads.' % 110 (len(alive_threads), len(self._threads))) 111 # Allow the main thread to periodically check for interrupts. 112 thread.join(0.1) 113 if not thread.isAlive(): 114 alive_threads.remove(thread) 115 # All threads are allowed to complete before reraising exceptions. 116 for thread in self._threads: 117 thread.ReraiseIfException() 118 119 def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)): 120 """Join all threads. 121 122 Reraises exceptions raised by the child threads and supports breaking 123 immediately on exceptions raised on the main thread. Unfinished threads' 124 stacks will be logged on watchdog timeout. 125 126 Args: 127 watcher: Watchdog object providing timeout, by default waits forever. 128 """ 129 try: 130 self._JoinAll(watcher) 131 except TimeoutError: 132 for thread in (t for t in self._threads if t.isAlive()): 133 LogThreadStack(thread) 134 raise 135