1#!/usr/bin/env python
2###
3### Copyright (C) 2011 Texas Instruments
4###
5### Licensed under the Apache License, Version 2.0 (the "License");
6### you may not use this file except in compliance with the License.
7### You may obtain a copy of the License at
8###
9###      http://www.apache.org/licenses/LICENSE-2.0
10###
11### Unless required by applicable law or agreed to in writing, software
12### distributed under the License is distributed on an "AS IS" BASIS,
13### WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14### See the License for the specific language governing permissions and
15### limitations under the License.
16###
17
18"""TestFlinger meta-test execution framework
19
20When writing a master test script that runs several scripts, this module
21can be used to execute those tests in a detached process (sandbox).
22Thus, if the test case fails by a segfault or timeout, this can be
23detected and the upper-level script simply moves on to the next script.
24"""
25
26import os
27import time
28import subprocess
29import sys
30import time
31
32g_default_timeout = 300
33
34class TestCase:
35    """Test running wrapper object."""
36
37    def __init__(self, TestDict = {}, Logfile = None):
38        """Set up the test runner object.
39
40        TestDict: dictionary with the test properties.  (string: value).  The
41                  recognized properties are:
42
43                  filename - name of executable test file
44                      Type: string
45                      Required: yes
46
47                  args - command line arguments for test
48                      Type: list of strings, or None
49                      Required: no
50                      Default: None
51
52                  timeout - upper limit on execution time (secs).  If test takes
53                      this long to run, then it is deemed a failure
54                      Type: integer
55                      Required: no
56                      Default: TestFlinger.g_default_timeout (typ. 300 sec)
57
58                  expect-fail - If the test is expected to fail (return non-zero)
59                      in order to pass, set this to True
60                      Type: bool
61                      Required: no
62                      Default: False
63
64                  expect-signal If the test is expected to fail because of
65                      a signal (e.g. SIGTERM, SIGSEGV) then this is considered success
66                      Type: bool
67                      Required: no
68                      Default: False
69
70        Logfile: a file object where stdout/stderr for the tests should be dumped.
71            If null, then no logging will be done.  (See also TestFlinger.setup_logfile()
72            and TestFlinger.close_logfile().
73        """
74        global g_default_timeout
75
76        self._program = None
77        self._args = None
78        self._timeout = g_default_timeout # Default timeout
79        self._verdict = None
80        self._expect_fail = False
81        self._expect_signal = False
82        self._logfile = Logfile
83
84        self._proc = None
85        self._time_expire = None
86
87        self._program = TestDict['filename']
88        if 'args' in TestDict:
89            self._args = TestDict['args']
90        if 'timeout' in TestDict and TestDict['timeout'] is not None:
91            self._timeout = TestDict['timeout']
92        if 'expect-fail' in TestDict and TestDict['expect-fail'] is not None:
93            self._expect_fail = TestDict['expect-fail']
94        if 'expect-signal' in TestDict and TestDict['expect-signal'] is not None:
95            self._expect_signal = TestDict['expect-signal']
96
97    def __del__(self):
98        pass
99
100    def start(self):
101        """Starts the test in another process.  Returns True if the
102        test was successfully spawned.  False if there was an error.
103        """
104
105        command = os.path.abspath(self._program)
106
107        if not os.path.exists(command):
108            print "ERROR: The program to execute does not exist (%s)" % (command,)
109            return False
110
111        timestamp = time.strftime("%Y.%m.%d %H:%M:%S")
112        now = time.time()
113        self._time_expire = self._timeout + now
114        self._kill_timeout = False
115
116        self._log_write("====================================================================\n")
117        self._log_write("BEGINNG TEST '%s' at %s\n" % (self._program, timestamp))
118        self._log_write("--------------------------------------------------------------------\n")
119        self._log_flush()
120
121        self._proc = subprocess.Popen(args=command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
122
123        return (self._proc is not None)
124
125    def wait(self):
126        """Blocks until the test completes or times out, whichever
127        comes first.  If test fails, returns False.  Otherwise returns
128        true.
129        """
130
131        if self._proc is None:
132            print "ERROR: Test was never started"
133            return False
134
135        self._proc.poll()
136        while (time.time() < self._time_expire) and (self._proc.poll() is None):
137            self._process_logs()
138            time.sleep(.5)
139
140        if self._proc.returncode is None:
141            self.kill()
142            return False
143
144        self._process_logs()
145        self._finalize_log()
146
147        return True
148
149    def kill(self):
150        """Kill the currently running test (if there is one).
151        """
152
153        if self._proc is None:
154            print "WARNING: killing a test was never started"
155            return False
156
157        self._kill_timeout = True
158        self._proc.terminate()
159        time.sleep(2)
160        self._proc.kill()
161        self._log_write("\nKilling process by request...\n")
162        self._log_flush()
163        self._finalize_log()
164
165        return True
166
167
168    def verdict(self):
169        """Returns a string, either 'PASS', 'FAIL', 'FAIL/TIMEOUT', or 'FAIL/SIGNAL(n)
170        '"""
171        self._proc.poll()
172
173        rc = self._proc.returncode
174
175        if rc is None:
176            print "ERROR: test is still running"
177
178        if self._kill_timeout:
179            return "FAIL/TIMOUT"
180
181        if rc < 0 and self._expect_signal:
182            return "PASS"
183        elif rc < 0:
184            return "FAIL/SIGNAL(%d)" % (-rc,)
185
186        if self._expect_fail:
187            if rc != 0:
188                return "PASS"
189            else:
190                return "FAIL"
191        else:
192            if rc == 0:
193                return "PASS"
194            else:
195                return "FAIL"
196
197    def _process_logs(self):
198        if self._logfile is not None:
199            data = self._proc.stdout.read()
200            self._logfile.write(data)
201            self._logfile.flush()
202
203    def _finalize_log(self):
204        timestamp = time.strftime("%Y.%m.%d %H:%M:%S")
205        self._log_write("--------------------------------------------------------------------\n")
206        self._log_write("ENDING TEST '%s' at %s\n" % (self._program, timestamp))
207        self._log_write("====================================================================\n")
208        self._log_flush()
209
210    def _log_write(self, data):
211        if self._logfile is not None:
212            self._logfile.write(data)
213
214    def _log_flush(self):
215        if self._logfile is not None:
216            self._logfile.flush()
217
218def setup_logfile(override_logfile_name = None):
219    """Open a logfile and prepare it for use with TestFlinger logging.
220    The filename will be generated based on the current date/time.
221
222    If override_logfile_name is not None, then that filename will be
223    used instead.
224
225    See also: close_logfile()
226    """
227
228    tmpfile = None
229    if override_logfile_name is not None:
230        tmpfile = override_logfile_name
231        if os.path.exists(tmpfile):
232            os.unlink(tmpfile)
233    else:
234        tmpfile = time.strftime("test-log-%Y.%m.%d.%H%M%S.txt")
235        while os.path.exists(tmpfile):
236            tmpfile = time.strftime("test-log-%Y.%m.%d.%H%M%S.txt")
237    fobj = open(tmpfile, 'wt')
238    print "Logging to", tmpfile
239    timestamp = time.strftime("%Y.%m.%d %H:%M:%S")
240    fobj.write("BEGINNING TEST SET %s\n" % (timestamp,))
241    fobj.write("====================================================================\n")
242    return fobj
243
244def close_logfile(fobj):
245    """Convenience function for closing a TestFlinger log file.
246
247    fobj: an open and writeable file object
248
249    See also : setup_logfile()
250    """
251
252    timestamp = time.strftime("%Y.%m.%d %H:%M:%S")
253    fobj.write("====================================================================\n")
254    fobj.write("CLOSING TEST SET %s\n" % (timestamp,))
255
256if __name__ == "__main__":
257    pass
258