1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import sys
7import threading
8import time
9from autotest_lib.client.common_lib import error
10
11
12class BaseStressor(threading.Thread):
13    """
14    Implements common functionality for *Stressor classes.
15
16    @var stressor: callable which performs a single stress event.
17    """
18    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
19        """
20        Initialize the ControlledStressor.
21
22        @param stressor: callable which performs a single stress event.
23        @param on_exit: callable which will be called when the thread finishes.
24        @param escalate_exceptions: whether to escalate exceptions to the parent
25            thread; defaults to True.
26        """
27        super(BaseStressor, self).__init__()
28        self.daemon = True
29        self.stressor = stressor
30        self.on_exit = on_exit
31        self._escalate_exceptions = escalate_exceptions
32        self._exc_info = None
33
34
35    def start(self, start_condition=None, start_timeout_secs=None):
36        """
37        Creates a new thread which will call the run() method.
38
39        Optionally takes a wait condition before the stressor loop. Returns
40        immediately.
41
42        @param start_condition: the new thread will wait until this optional
43            callable returns True before running the stressor.
44        @param start_timeout_secs: how long to wait for |start_condition| to
45            become True, or None to wait forever.
46        """
47        self._start_condition = start_condition
48        self._start_timeout_secs = start_timeout_secs
49        super(BaseStressor, self).start()
50
51
52    def run(self):
53        """
54        Wait for |_start_condition|, and then start the stressor loop.
55
56        Overloaded from threading.Thread. This is run in a separate thread when
57        start() is called.
58        """
59        try:
60            self._wait_for_start_condition()
61            self._loop_stressor()
62        except Exception as e:
63            if self._escalate_exceptions:
64                self._exc_info = sys.exc_info()
65            raise  # Terminates this thread. Caller continues to run.
66        finally:
67            if self.on_exit:
68              self.on_exit()
69
70
71    def _wait_for_start_condition(self):
72        """
73        Loop until _start_condition() returns True, or _start_timeout_secs
74        have elapsed.
75
76        @raise error.TestFail if we time out waiting for the start condition
77        """
78        if self._start_condition is None:
79            return
80
81        elapsed_secs = 0
82        while not self._start_condition():
83            if (self._start_timeout_secs and
84                    elapsed_secs >= self._start_timeout_secs):
85                raise error.TestFail('start condition did not become true '
86                                     'within %d seconds' %
87                                     self._start_timeout_secs)
88            time.sleep(1)
89            elapsed_secs += 1
90
91
92    def _loop_stressor(self):
93        """
94        Apply stressor in a loop.
95
96        Overloaded by the particular *Stressor.
97        """
98        raise NotImplementedError
99
100
101    def reraise(self):
102        """
103        Reraise an exception raised in the thread's stress loop.
104
105        This is a No-op if no exception was raised.
106        """
107        if self._exc_info:
108            exc_info = self._exc_info
109            self._exc_info = None
110            raise exc_info[0], exc_info[1], exc_info[2]
111
112
113class ControlledStressor(BaseStressor):
114    """
115    Run a stressor in loop on a separate thread.
116
117    Creates a new thread and calls |stressor| in a loop until stop() is called.
118    """
119    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
120        """
121        Initialize the ControlledStressor.
122
123        @param stressor: callable which performs a single stress event.
124        @param on_exit: callable which will be called when the thread finishes.
125        @param escalate_exceptions: whether to escalate exceptions to the parent
126            thread when stop() is called; defaults to True.
127        """
128        self._complete = threading.Event()
129        super(ControlledStressor, self).__init__(stressor, on_exit,
130                                                 escalate_exceptions)
131
132
133    def _loop_stressor(self):
134        """Overloaded from parent."""
135        iteration_num = 0
136        while not self._complete.is_set():
137            iteration_num += 1
138            logging.info('Stressor iteration: %d', iteration_num)
139            self.stressor()
140
141
142    def start(self, start_condition=None, start_timeout_secs=None):
143        """Start applying the stressor.
144
145        Overloaded from parent.
146
147        @param start_condition: the new thread will wait to until this optional
148            callable returns True before running the stressor.
149        @param start_timeout_secs: how long to wait for |start_condition| to
150            become True, or None to wait forever.
151        """
152        self._complete.clear()
153        super(ControlledStressor, self).start(start_condition,
154                                              start_timeout_secs)
155
156
157    def stop(self, timeout=45):
158        """
159        Stop applying the stressor.
160
161        @param timeout: maximum time to wait for a single run of the stressor to
162            complete, defaults to 45 seconds.
163        """
164        self._complete.set()
165        self.join(timeout)
166        self.reraise()
167
168
169class CountedStressor(BaseStressor):
170    """
171    Run a stressor in a loop on a separate thread a given number of times.
172
173    Creates a new thread and calls |stressor| in a loop |iterations| times. The
174    calling thread can use wait() to block until the loop completes. If the
175    stressor thread terminates with an exception, wait() will propagate that
176    exception to the thread that called wait().
177    """
178    def _loop_stressor(self):
179        """Overloaded from parent."""
180        for iteration_num in xrange(1, self._iterations + 1):
181            logging.info('Stressor iteration: %d of %d',
182                         iteration_num, self._iterations)
183            self.stressor()
184
185
186    def start(self, iterations, start_condition=None, start_timeout_secs=None):
187        """
188        Apply the stressor a given number of times.
189
190        Overloaded from parent.
191
192        @param iterations: number of times to apply the stressor.
193        @param start_condition: the new thread will wait to until this optional
194            callable returns True before running the stressor.
195        @param start_timeout_secs: how long to wait for |start_condition| to
196            become True, or None to wait forever.
197        """
198        self._iterations = iterations
199        super(CountedStressor, self).start(start_condition, start_timeout_secs)
200
201
202    def wait(self, timeout=None):
203        """Wait until the stressor completes.
204
205        @param timeout: maximum time for the thread to complete, by default
206            never times out.
207        """
208        self.join(timeout)
209        self.reraise()
210