1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Thread and ThreadGroup that reraise exceptions on the main thread."""
6
7import logging
8import sys
9import threading
10import time
11import traceback
12
13import watchdog_timer
14
15
16class TimeoutError(Exception):
17  """Module-specific timeout exception."""
18  pass
19
20
21def LogThreadStack(thread):
22  """Log the stack for the given thread.
23
24  Args:
25    thread: a threading.Thread instance.
26  """
27  stack = sys._current_frames()[thread.ident]
28  logging.critical('*' * 80)
29  logging.critical('Stack dump for thread \'%s\'', thread.name)
30  logging.critical('*' * 80)
31  for filename, lineno, name, line in traceback.extract_stack(stack):
32    logging.critical('File: "%s", line %d, in %s', filename, lineno, name)
33    if line:
34      logging.critical('  %s', line.strip())
35  logging.critical('*' * 80)
36
37
38class ReraiserThread(threading.Thread):
39  """Thread class that can reraise exceptions."""
40
41  def __init__(self, func, args=[], kwargs={}, name=None):
42    """Initialize thread.
43
44    Args:
45      func: callable to call on a new thread.
46      args: list of positional arguments for callable, defaults to empty.
47      kwargs: dictionary of keyword arguments for callable, defaults to empty.
48      name: thread name, defaults to Thread-N.
49    """
50    super(ReraiserThread, self).__init__(name=name)
51    self.daemon = True
52    self._func = func
53    self._args = args
54    self._kwargs = kwargs
55    self._exc_info = None
56
57  def ReraiseIfException(self):
58    """Reraise exception if an exception was raised in the thread."""
59    if self._exc_info:
60      raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
61
62  #override
63  def run(self):
64    """Overrides Thread.run() to add support for reraising exceptions."""
65    try:
66      self._func(*self._args, **self._kwargs)
67    except:
68      self._exc_info = sys.exc_info()
69      raise
70
71
72class ReraiserThreadGroup(object):
73  """A group of ReraiserThread objects."""
74
75  def __init__(self, threads=[]):
76    """Initialize thread group.
77
78    Args:
79      threads: a list of ReraiserThread objects; defaults to empty.
80    """
81    self._threads = threads
82
83  def Add(self, thread):
84    """Add a thread to the group.
85
86    Args:
87      thread: a ReraiserThread object.
88    """
89    self._threads.append(thread)
90
91  def StartAll(self):
92    """Start all threads."""
93    for thread in self._threads:
94      thread.start()
95
96  def _JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
97    """Join all threads without stack dumps.
98
99    Reraises exceptions raised by the child threads and supports breaking
100    immediately on exceptions raised on the main thread.
101
102    Args:
103      watcher: Watchdog object providing timeout, by default waits forever.
104    """
105    alive_threads = self._threads[:]
106    while alive_threads:
107      for thread in alive_threads[:]:
108        if watcher.IsTimedOut():
109          raise TimeoutError('Timed out waiting for %d of %d threads.' %
110                             (len(alive_threads), len(self._threads)))
111        # Allow the main thread to periodically check for interrupts.
112        thread.join(0.1)
113        if not thread.isAlive():
114          alive_threads.remove(thread)
115    # All threads are allowed to complete before reraising exceptions.
116    for thread in self._threads:
117      thread.ReraiseIfException()
118
119  def JoinAll(self, watcher=watchdog_timer.WatchdogTimer(None)):
120    """Join all threads.
121
122    Reraises exceptions raised by the child threads and supports breaking
123    immediately on exceptions raised on the main thread. Unfinished threads'
124    stacks will be logged on watchdog timeout.
125
126    Args:
127      watcher: Watchdog object providing timeout, by default waits forever.
128    """
129    try:
130      self._JoinAll(watcher)
131    except TimeoutError:
132      for thread in (t for t in self._threads if t.isAlive()):
133        LogThreadStack(thread)
134      raise
135