test.py revision b718a1c97cfb674f2e93c5f7476366e5e26826e5
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import os, sys, re, fcntl, shutil, tarfile, warnings, tempfile 20 21from autotest_lib.client.common_lib import error, utils, packages 22from autotest_lib.client.bin import autotest_utils 23 24 25class base_test: 26 preserve_srcdir = False 27 28 def __init__(self, job, bindir, outputdir): 29 self.job = job 30 self.autodir = job.autodir 31 32 self.outputdir = outputdir 33 tagged_testname = os.path.basename(self.outputdir) 34 self.resultsdir = os.path.join(self.outputdir, 'results') 35 os.mkdir(self.resultsdir) 36 self.profdir = os.path.join(self.outputdir, 'profiling') 37 os.mkdir(self.profdir) 38 self.debugdir = os.path.join(self.outputdir, 'debug') 39 os.mkdir(self.debugdir) 40 self.bindir = bindir 41 if hasattr(job, 'libdir'): 42 self.libdir = job.libdir 43 self.srcdir = os.path.join(self.bindir, 'src') 44 self.tmpdir = tempfile.mkdtemp("_" + tagged_testname, dir=job.tmpdir) 45 46 47 def assert_(self, expr, msg='Assertion failed.'): 48 if not expr: 49 raise error.TestError(msg) 50 51 52 def write_test_keyval(self, attr_dict): 53 utils.write_keyval(self.outputdir, attr_dict) 54 55 56 @staticmethod 57 def _append_type_to_keys(dictionary, typename): 58 new_dict = {} 59 for key, value in dictionary.iteritems(): 60 new_key = "%s{%s}" % (key, typename) 61 new_dict[new_key] = value 62 return new_dict 63 64 65 def write_perf_keyval(self, perf_dict): 66 self.write_iteration_keyval({}, perf_dict) 67 68 69 def write_attr_keyval(self, attr_dict): 70 self.write_iteration_keyval(attr_dict, {}) 71 72 73 def write_iteration_keyval(self, attr_dict, perf_dict): 74 if attr_dict: 75 attr_dict = self._append_type_to_keys(attr_dict, "attr") 76 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 77 78 if perf_dict: 79 perf_dict = self._append_type_to_keys(perf_dict, "perf") 80 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 81 82 keyval_path = os.path.join(self.resultsdir, "keyval") 83 print >> open(keyval_path, "a"), "" 84 85 86 def initialize(self): 87 print 'No initialize phase defined' 88 pass 89 90 91 def setup(self): 92 pass 93 94 95 def warmup(self, *args, **dargs): 96 pass 97 98 99 def execute(self, iterations=1, *args, **dargs): 100 self.warmup(*args, **dargs) 101 102 profilers = self.job.profilers 103 # Dropped profilers.only() - if you want that, use iterations=0 104 for self.iterations in range(1, iterations+1): 105 self.run_once(*args, **dargs) 106 107 # Do a profiling run if necessary 108 if profilers.present(): 109 profilers.start(self) 110 self.run_once(*args, **dargs) 111 profilers.stop(self) 112 profilers.report(self) 113 114 # Do any postprocessing, normally extracting performance keyvals, etc 115 self.postprocess() 116 117 118 def postprocess(self): 119 pass 120 121 122 def cleanup(self): 123 pass 124 125 126 def _exec(self, args, dargs): 127 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 128 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 129 130 try: 131 # write out the test attributes into a keyval 132 dargs = dargs.copy() 133 keyvals = dargs.pop('test_attributes', dict()).copy() 134 keyvals['version'] = self.version 135 self.write_test_keyval(keyvals) 136 137 _validate_args(args, dargs, self.initialize, self.setup, 138 self.execute, self.cleanup) 139 140 try: 141 # Initialize: 142 p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs) 143 self.initialize(*p_args, **p_dargs) 144 145 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 146 try: 147 fcntl.flock(lockfile, fcntl.LOCK_EX) 148 # Setup: (compile and install the test, if needed) 149 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 150 utils.update_version(self.srcdir, self.preserve_srcdir, 151 self.version, self.setup, 152 *p_args, **p_dargs) 153 finally: 154 fcntl.flock(lockfile, fcntl.LOCK_UN) 155 lockfile.close() 156 157 # Execute: 158 os.chdir(self.outputdir) 159 if hasattr(self, 'run_once'): 160 p_args, p_dargs = _cherry_pick_args(self.run_once, 161 args, dargs) 162 if 'iterations' in dargs: 163 p_dargs['iterations'] = dargs['iterations'] 164 else: 165 p_args, p_dargs = _cherry_pick_args(self.execute, 166 args, dargs) 167 try: 168 self.execute(*p_args, **p_dargs) 169 except error.AutotestError: 170 raise 171 except Exception, e: 172 raise error.UnhandledTestFail(e) 173 except: 174 exc_info = sys.exc_info() 175 else: 176 exc_info = None 177 178 # run the cleanup, and then restore the job.std* streams 179 try: 180 # if an exception occurs during the cleanup() call, we 181 # don't want it to override an existing exception 182 # (i.e. exc_info) that was thrown by the test execution 183 if exc_info: 184 try: 185 self.cleanup() 186 finally: 187 try: 188 raise exc_info[0], exc_info[1], exc_info[2] 189 finally: 190 # necessary to prevent a circular reference 191 # between exc_info[2] (the traceback, which 192 # references all the exception stack frames) 193 # and this stack frame (which refs exc_info[2]) 194 del exc_info 195 else: 196 self.cleanup() 197 finally: 198 self.job.stderr.restore() 199 self.job.stdout.restore() 200 201 except error.AutotestError: 202 raise 203 except Exception, e: 204 raise error.UnhandledTestError(e) 205 206 207def _cherry_pick_args(func, args, dargs): 208 # Cherry pick args: 209 if func.func_code.co_flags & 0x04: 210 # func accepts *args, so return the entire args. 211 p_args = args 212 else: 213 p_args = () 214 215 # Cherry pick dargs: 216 if func.func_code.co_flags & 0x08: 217 # func accepts **dargs, so return the entire dargs. 218 p_dargs = dargs 219 else: 220 p_dargs = {} 221 for param in func.func_code.co_varnames[:func.func_code.co_argcount]: 222 if param in dargs: 223 p_dargs[param] = dargs[param] 224 225 return p_args, p_dargs 226 227 228def _validate_args(args, dargs, *funcs): 229 all_co_flags = 0 230 all_varnames = () 231 for func in funcs: 232 all_co_flags |= func.func_code.co_flags 233 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 234 235 # Check if given args belongs to at least one of the methods below. 236 if len(args) > 0: 237 # Current implementation doesn't allow the use of args. 238 raise error.AutotestError('Unnamed arguments not accepted. Please, ' \ 239 'call job.run_test with named args only') 240 241 # Check if given dargs belongs to at least one of the methods below. 242 if len(dargs) > 0: 243 if not all_co_flags & 0x08: 244 # no func accepts *dargs, so: 245 for param in dargs: 246 if not param in all_varnames: 247 raise error.AutotestError('Unknown parameter: %s' % param) 248 249 250def _installtest(job, url): 251 (group, name) = job.pkgmgr.get_package_name(url, 'test') 252 253 # Bail if the test is already installed 254 group_dir = os.path.join(job.testdir, "download", group) 255 if os.path.exists(os.path.join(group_dir, name)): 256 return (group, name) 257 258 # If the group directory is missing create it and add 259 # an empty __init__.py so that sub-directories are 260 # considered for import. 261 if not os.path.exists(group_dir): 262 os.mkdir(group_dir) 263 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 264 f.close() 265 266 print name + ": installing test url=" + url 267 tarball = os.path.basename(url) 268 tarball_path = os.path.join(group_dir, tarball) 269 test_dir = os.path.join(group_dir, name) 270 job.pkgmgr.fetch_pkg(tarball, tarball_path, 271 repo_url = os.path.dirname(url)) 272 273 # Create the directory for the test 274 if not os.path.exists(test_dir): 275 os.mkdir(os.path.join(group_dir, name)) 276 277 job.pkgmgr.untar_pkg(tarball_path, test_dir) 278 279 os.remove(tarball_path) 280 281 # For this 'sub-object' to be importable via the name 282 # 'group.name' we need to provide an __init__.py, 283 # so link the main entry point to this. 284 os.symlink(name + '.py', os.path.join(group_dir, name, 285 '__init__.py')) 286 287 # The test is now installed. 288 return (group, name) 289 290 291def runtest(job, url, tag, args, dargs, 292 local_namespace={}, global_namespace={}, after_test_hook=None): 293 local_namespace = local_namespace.copy() 294 global_namespace = global_namespace.copy() 295 296 # if this is not a plain test name then download and install the 297 # specified test 298 if url.endswith('.tar.bz2'): 299 (group, testname) = _installtest(job, url) 300 bindir = os.path.join(job.testdir, 'download', group, testname) 301 site_bindir = None 302 else: 303 # if the test is local, it can be found in either testdir 304 # or site_testdir. tests in site_testdir override tests 305 # defined in testdir 306 (group, testname) = ('', url) 307 bindir = os.path.join(job.testdir, group, testname) 308 if hasattr(job, 'site_testdir'): 309 site_bindir = os.path.join(job.site_testdir, 310 group, testname) 311 else: 312 site_bindir = None 313 314 # The job object here can be that of a server side job or a client 315 # side job. 'install_pkg' method won't be present for server side 316 # jobs, so do the fetch only if that method is present in the job 317 # obj. 318 if hasattr(job, 'install_pkg'): 319 try: 320 job.install_pkg(testname, 'test', bindir) 321 except packages.PackageInstallError, e: 322 # continue as a fall back mechanism and see if the test code 323 # already exists on the machine 324 pass 325 326 outputdir = os.path.join(job.resultdir, testname) 327 if tag: 328 outputdir += '.' + tag 329 330 # if we can find the test in site_bindir, use this version 331 if site_bindir and os.path.exists(site_bindir): 332 bindir = site_bindir 333 testdir = job.site_testdir 334 elif os.path.exists(bindir): 335 testdir = job.testdir 336 else: 337 raise error.TestError(testname + ': test does not exist') 338 339 local_namespace['job'] = job 340 local_namespace['bindir'] = bindir 341 local_namespace['outputdir'] = outputdir 342 343 if group: 344 sys.path.insert(0, os.path.join(testdir, 'download')) 345 group += '.' 346 else: 347 sys.path.insert(0, os.path.join(testdir, testname)) 348 349 try: 350 exec ("import %s%s" % (group, testname), 351 local_namespace, global_namespace) 352 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 353 (group, testname, testname), 354 local_namespace, global_namespace) 355 finally: 356 sys.path.pop(0) 357 358 pwd = os.getcwd() 359 os.chdir(outputdir) 360 try: 361 mytest = global_namespace['mytest'] 362 mytest._exec(args, dargs) 363 finally: 364 os.chdir(pwd) 365 if after_test_hook: 366 after_test_hook(mytest) 367 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 368