test.py revision 62655786cbb12ebe901411e8f4b55e9394e4c633
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<testname.tag> 18 19import os, sys, re, fcntl, shutil, tarfile, warnings 20 21from autotest_lib.client.common_lib import error, utils 22 23 24class base_test: 25 preserve_srcdir = False 26 27 def __init__(self, job, bindir, outputdir): 28 self.job = job 29 self.autodir = job.autodir 30 31 self.outputdir = outputdir 32 tagged_testname = os.path.basename(self.outputdir) 33 self.resultsdir = os.path.join(self.outputdir, 'results') 34 os.mkdir(self.resultsdir) 35 self.profdir = os.path.join(self.outputdir, 'profiling') 36 os.mkdir(self.profdir) 37 self.debugdir = os.path.join(self.outputdir, 'debug') 38 os.mkdir(self.debugdir) 39 self.bindir = bindir 40 if hasattr(job, 'libdir'): 41 self.libdir = job.libdir 42 self.srcdir = os.path.join(self.bindir, 'src') 43 44 self.tmpdir = os.path.join(job.tmpdir, tagged_testname) 45 46 if os.path.exists(self.tmpdir): 47 shutil.rmtree(self.tmpdir) 48 os.mkdir(self.tmpdir) 49 50 51 def assert_(self, expr, msg='Assertion failed.'): 52 if not expr: 53 raise error.TestError(msg) 54 55 56 def write_test_keyval(self, attr_dict): 57 utils.write_keyval(self.outputdir, attr_dict) 58 59 60 @staticmethod 61 def _append_type_to_keys(dictionary, typename): 62 new_dict = {} 63 for key, value in dictionary.iteritems(): 64 new_key = "%s{%s}" % (key, typename) 65 new_dict[new_key] = value 66 return new_dict 67 68 69 def write_perf_keyval(self, perf_dict): 70 self.write_iteration_keyval({}, perf_dict) 71 72 73 def write_attr_keyval(self, attr_dict): 74 self.write_iteration_keyval(attr_dict, {}) 75 76 77 def write_iteration_keyval(self, attr_dict, perf_dict): 78 if attr_dict: 79 attr_dict = self._append_type_to_keys(attr_dict, "attr") 80 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 81 82 if perf_dict: 83 perf_dict = self._append_type_to_keys(perf_dict, "perf") 84 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 85 86 keyval_path = os.path.join(self.resultsdir, "keyval") 87 print >> open(keyval_path, "a"), "" 88 89 90 def initialize(self): 91 print 'No initialize phase defined' 92 pass 93 94 95 def setup(self): 96 pass 97 98 99 def warmup(self): 100 pass 101 102 103 def execute(self, iterations=1, *args, **dargs): 104 self.warmup(*args, **dargs) 105 106 profilers = self.job.profilers 107 # Dropped profilers.only() - if you want that, use iterations=0 108 for i in range(iterations): 109 self.run_once(*args, **dargs) 110 111 # Do a profiling run if necessary 112 if profilers.present(): 113 profilers.start(self) 114 self.run_once(*args, **dargs) 115 profilers.stop(self) 116 profilers.report(self) 117 118 # Do any postprocessing, normally extracting performance keyvals, etc 119 self.postprocess() 120 121 122 def postprocess(self): 123 pass 124 125 126 def cleanup(self): 127 pass 128 129 130 def _exec(self, args, dargs): 131 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 132 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 133 134 try: 135 # write out the test attributes into a keyval 136 dargs = dargs.copy() 137 keyvals = dargs.pop('test_attributes', dict()).copy() 138 keyvals['version'] = self.version 139 self.write_test_keyval(keyvals) 140 141 _validate_args(args, dargs, self.initialize, self.setup, 142 self.execute, self.cleanup) 143 144 try: 145 # Initialize: 146 p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs) 147 self.initialize(*p_args, **p_dargs) 148 149 # Setup: (compile and install the test, if needed) 150 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 151 utils.update_version(self.srcdir, self.preserve_srcdir, 152 self.version, self.setup, 153 *p_args, **p_dargs) 154 155 # Execute: 156 os.chdir(self.outputdir) 157 if hasattr(self, 'run_once'): 158 p_args, p_dargs = _cherry_pick_args(self.run_once, 159 args, dargs) 160 if 'iterations' in dargs: 161 p_dargs['iterations'] = dargs['iterations'] 162 else: 163 p_args, p_dargs = _cherry_pick_args(self.execute, 164 args, dargs) 165 try: 166 self.execute(*p_args, **p_dargs) 167 except error.AutotestError: 168 raise 169 except Exception, e: 170 raise error.UnhandledTestFail(e) 171 172 finally: 173 self.cleanup() 174 self.job.stderr.restore() 175 self.job.stdout.restore() 176 except error.AutotestError: 177 raise 178 except Exception, e: 179 raise error.UnhandledTestError(e) 180 181 182def _cherry_pick_args(func, args, dargs): 183 # Cherry pick args: 184 if func.func_code.co_flags & 0x04: 185 # func accepts *args, so return the entire args. 186 p_args = args 187 else: 188 p_args = () 189 190 # Cherry pick dargs: 191 if func.func_code.co_flags & 0x08: 192 # func accepts **dargs, so return the entire dargs. 193 p_dargs = dargs 194 else: 195 p_dargs = {} 196 for param in func.func_code.co_varnames[:func.func_code.co_argcount]: 197 if param in dargs: 198 p_dargs[param] = dargs[param] 199 200 return p_args, p_dargs 201 202 203def _validate_args(args, dargs, *funcs): 204 all_co_flags = 0 205 all_varnames = () 206 for func in funcs: 207 all_co_flags |= func.func_code.co_flags 208 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 209 210 # Check if given args belongs to at least one of the methods below. 211 if len(args) > 0: 212 # Current implementation doesn't allow the use of args. 213 raise error.AutotestError('Unnamed arguments not accepted. Please, ' \ 214 'call job.run_test with named args only') 215 216 # Check if given dargs belongs to at least one of the methods below. 217 if len(dargs) > 0: 218 if not all_co_flags & 0x08: 219 # no func accepts *dargs, so: 220 for param in dargs: 221 if not param in all_varnames: 222 raise error.AutotestError('Unknown parameter: %s' % param) 223 224 225def testname(url): 226 # Extract the testname from the test url. 227 match = re.match('[^:]+://(.*)/([^/]*)$', url) 228 if not match: 229 return ('', url) 230 (group, filename) = match.groups() 231 232 # Generate the group prefix. 233 group = re.sub(r'\W', '_', group) 234 235 # Drop the extension to get the raw test name. 236 testname = re.sub(r'\.tgz', '', filename) 237 238 return (group, testname) 239 240 241def _installtest(job, url): 242 (group, name) = testname(url) 243 244 # Bail if the test is already installed 245 group_dir = os.path.join(job.testdir, "download", group) 246 if os.path.exists(os.path.join(group_dir, name)): 247 return (group, name) 248 249 # If the group directory is missing create it and add 250 # an empty __init__.py so that sub-directories are 251 # considered for import. 252 if not os.path.exists(group_dir): 253 os.mkdir(group_dir) 254 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 255 f.close() 256 257 print name + ": installing test url=" + url 258 utils.get_file(url, os.path.join(group_dir, 'test.tgz')) 259 old_wd = os.getcwd() 260 os.chdir(group_dir) 261 tar = tarfile.open('test.tgz') 262 for member in tar.getmembers(): 263 tar.extract(member) 264 tar.close() 265 os.chdir(old_wd) 266 os.remove(os.path.join(group_dir, 'test.tgz')) 267 268 # For this 'sub-object' to be importable via the name 269 # 'group.name' we need to provide an __init__.py, 270 # so link the main entry point to this. 271 os.symlink(name + '.py', os.path.join(group_dir, name, 272 '__init__.py')) 273 274 # The test is now installed. 275 return (group, name) 276 277 278def runtest(job, url, tag, args, dargs, 279 local_namespace={}, global_namespace={}, after_test_hook=None): 280 local_namespace = local_namespace.copy() 281 global_namespace = global_namespace.copy() 282 283 # if this is not a plain test name then download and install the 284 # specified test 285 if utils.is_url(url): 286 (group, testname) = _installtest(job, url) 287 bindir = os.path.join(job.testdir, 'download', group, testname) 288 site_bindir = None 289 else: 290 # if the test is local, it can be found in either testdir 291 # or site_testdir. tests in site_testdir override tests 292 # defined in testdir 293 (group, testname) = ('', url) 294 bindir = os.path.join(job.testdir, group, testname) 295 if hasattr(job, 'site_testdir'): 296 site_bindir = os.path.join(job.site_testdir, 297 group, testname) 298 else: 299 site_bindir = None 300 301 outputdir = os.path.join(job.resultdir, testname) 302 if tag: 303 outputdir += '.' + tag 304 305 # if we can find the test in site_bindir, use this version 306 if site_bindir and os.path.exists(site_bindir): 307 bindir = site_bindir 308 testdir = job.site_testdir 309 elif os.path.exists(bindir): 310 testdir = job.testdir 311 elif not os.path.exists(bindir): 312 raise error.TestError(testname + ': test does not exist') 313 314 if group: 315 sys.path.insert(0, os.path.join(testdir, 'download')) 316 group += '.' 317 else: 318 sys.path.insert(0, os.path.join(testdir, testname)) 319 320 local_namespace['job'] = job 321 local_namespace['bindir'] = bindir 322 local_namespace['outputdir'] = outputdir 323 324 lockfile = open(os.path.join(job.tmpdir, '.testlock'), 'w') 325 try: 326 fcntl.flock(lockfile, fcntl.LOCK_EX) 327 exec ("import %s%s" % (group, testname), 328 local_namespace, global_namespace) 329 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 330 (group, testname, testname), 331 local_namespace, global_namespace) 332 finally: 333 fcntl.flock(lockfile, fcntl.LOCK_UN) 334 lockfile.close() 335 sys.path.pop(0) 336 337 pwd = os.getcwd() 338 os.chdir(outputdir) 339 try: 340 mytest = global_namespace['mytest'] 341 mytest._exec(args, dargs) 342 finally: 343 if after_test_hook: 344 after_test_hook(mytest) 345