test.py revision 47faacb31010a57d56296079bbbaf2f141b2a4e0
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import os, sys, re, fcntl, shutil, tarfile, time, warnings, tempfile 20 21from autotest_lib.client.common_lib import error, utils, packages 22from autotest_lib.client.bin import autotest_utils 23 24 25class base_test: 26 preserve_srcdir = False 27 28 def __init__(self, job, bindir, outputdir): 29 self.job = job 30 self.autodir = job.autodir 31 32 self.outputdir = outputdir 33 tagged_testname = os.path.basename(self.outputdir) 34 self.resultsdir = os.path.join(self.outputdir, 'results') 35 os.mkdir(self.resultsdir) 36 self.profdir = os.path.join(self.outputdir, 'profiling') 37 os.mkdir(self.profdir) 38 self.debugdir = os.path.join(self.outputdir, 'debug') 39 os.mkdir(self.debugdir) 40 self.bindir = bindir 41 if hasattr(job, 'libdir'): 42 self.libdir = job.libdir 43 self.srcdir = os.path.join(self.bindir, 'src') 44 self.tmpdir = tempfile.mkdtemp("_" + tagged_testname, dir=job.tmpdir) 45 46 47 def assert_(self, expr, msg='Assertion failed.'): 48 if not expr: 49 raise error.TestError(msg) 50 51 52 def write_test_keyval(self, attr_dict): 53 utils.write_keyval(self.outputdir, attr_dict) 54 55 56 @staticmethod 57 def _append_type_to_keys(dictionary, typename): 58 new_dict = {} 59 for key, value in dictionary.iteritems(): 60 new_key = "%s{%s}" % (key, typename) 61 new_dict[new_key] = value 62 return new_dict 63 64 65 def write_perf_keyval(self, perf_dict): 66 self.write_iteration_keyval({}, perf_dict) 67 68 69 def write_attr_keyval(self, attr_dict): 70 self.write_iteration_keyval(attr_dict, {}) 71 72 73 def write_iteration_keyval(self, attr_dict, perf_dict): 74 if attr_dict: 75 attr_dict = self._append_type_to_keys(attr_dict, "attr") 76 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 77 78 if perf_dict: 79 perf_dict = self._append_type_to_keys(perf_dict, "perf") 80 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 81 82 keyval_path = os.path.join(self.resultsdir, "keyval") 83 print >> open(keyval_path, "a"), "" 84 85 86 def initialize(self): 87 print 'No initialize phase defined' 88 pass 89 90 91 def setup(self): 92 pass 93 94 95 def warmup(self, *args, **dargs): 96 pass 97 98 99 def execute(self, iterations=None, test_length=None, *args, **dargs): 100 """ 101 This is the basic execute method for the tests inherited from base_test. 102 If you want to implement a benchmark test, it's better to implement 103 the run_once function, to cope with the profiling infrastructure. For 104 other tests, you can just override the default implementation. 105 106 @param test_length: The minimum test length in seconds. We'll run the 107 run_once function for a number of times large enough to cover the 108 minimum test length. 109 110 @param iterations: A number of iterations that we'll run the run_once 111 function. This parameter is incompatible with test_length and will 112 be silently ignored if you specify both. 113 """ 114 115 self.warmup(*args, **dargs) 116 # For our special class of tests, the benchmarks, we don't want 117 # profilers to run during the test iterations. Let's reserve only 118 # the last iteration for profiling, if needed. So let's stop 119 # all profilers if they are present and active. 120 profilers = self.job.profilers 121 if profilers.active(): 122 profilers.stop(self) 123 # If the user called this test in an odd way (specified both iterations 124 # and test_length), let's warn him 125 if iterations and test_length: 126 print 'Iterations parameter silently ignored (timed execution)' 127 if test_length: 128 test_start = time.time() 129 time_elapsed = 0 130 timed_counter = 0 131 print 'Benchmark started. Minimum test length: %d s' % (test_length) 132 while time_elapsed < test_length: 133 timed_counter = timed_counter + 1 134 if time_elapsed == 0: 135 print 'Executing iteration %d' % (timed_counter) 136 elif time_elapsed > 0: 137 print 'Executing iteration %d, time_elapsed %d s' % \ 138 (timed_counter, time_elapsed) 139 self.run_once(*args, **dargs) 140 test_iteration_finish = time.time() 141 time_elapsed = test_iteration_finish - test_start 142 print 'Benchmark finished after %d iterations' % (timed_counter) 143 print 'Time elapsed: %d s' % (time_elapsed) 144 else: 145 if not iterations: 146 iterations = 1 147 # Dropped profilers.only() - if you want that, use iterations=0 148 print 'Benchmark started. Number of iterations: %d' % (iterations) 149 for self.iteration in range(1, iterations+1): 150 print 'Executing iteration %d of %d' % (self.iteration, 151 iterations) 152 self.run_once(*args, **dargs) 153 print 'Benchmark finished after %d iterations' % (iterations) 154 155 # Do a profiling run if necessary 156 if profilers.present(): 157 profilers.start(self) 158 print 'Profilers present. Profiling run started' 159 self.run_once(*args, **dargs) 160 profilers.stop(self) 161 profilers.report(self) 162 163 # Do any postprocessing, normally extracting performance keyvals, etc 164 self.postprocess() 165 166 167 def postprocess(self): 168 pass 169 170 171 def cleanup(self): 172 pass 173 174 175 def _exec(self, args, dargs): 176 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 177 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 178 179 try: 180 # write out the test attributes into a keyval 181 dargs = dargs.copy() 182 keyvals = dargs.pop('test_attributes', dict()).copy() 183 keyvals['version'] = self.version 184 for i, arg in enumerate(args): 185 keyvals['param-%d' % i] = repr(arg) 186 for name, arg in dargs.iteritems(): 187 keyvals['param-%s' % name] = repr(arg) 188 self.write_test_keyval(keyvals) 189 190 _validate_args(args, dargs, self.initialize, self.setup, 191 self.execute, self.cleanup) 192 193 try: 194 # Initialize: 195 p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs) 196 self.initialize(*p_args, **p_dargs) 197 198 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 199 try: 200 fcntl.flock(lockfile, fcntl.LOCK_EX) 201 # Setup: (compile and install the test, if needed) 202 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 203 utils.update_version(self.srcdir, self.preserve_srcdir, 204 self.version, self.setup, 205 *p_args, **p_dargs) 206 finally: 207 fcntl.flock(lockfile, fcntl.LOCK_UN) 208 lockfile.close() 209 210 # Execute: 211 if self.job.drop_caches: 212 print "Dropping caches before running test" 213 autotest_utils.drop_caches() 214 215 os.chdir(self.outputdir) 216 if hasattr(self, 'run_once'): 217 p_args, p_dargs = _cherry_pick_args(self.run_once, 218 args, dargs) 219 if 'iterations' in dargs: 220 p_dargs['iterations'] = dargs['iterations'] 221 if 'test_length' in dargs: 222 p_dargs['test_length'] = dargs['test_length'] 223 else: 224 p_args, p_dargs = _cherry_pick_args(self.execute, 225 args, dargs) 226 try: 227 self.execute(*p_args, **p_dargs) 228 except error.AutotestError: 229 raise 230 except Exception, e: 231 raise error.UnhandledTestFail(e) 232 except: 233 exc_info = sys.exc_info() 234 else: 235 exc_info = None 236 237 # run the cleanup, and then restore the job.std* streams 238 try: 239 p_args, p_dargs = _cherry_pick_args(self.cleanup, args, dargs) 240 # if an exception occurs during the cleanup() call, we 241 # don't want it to override an existing exception 242 # (i.e. exc_info) that was thrown by the test execution 243 if exc_info: 244 try: 245 self.cleanup(*p_args, **p_dargs) 246 finally: 247 try: 248 raise exc_info[0], exc_info[1], exc_info[2] 249 finally: 250 # necessary to prevent a circular reference 251 # between exc_info[2] (the traceback, which 252 # references all the exception stack frames) 253 # and this stack frame (which refs exc_info[2]) 254 del exc_info 255 else: 256 self.cleanup(*p_args, **p_dargs) 257 finally: 258 self.job.stderr.restore() 259 self.job.stdout.restore() 260 261 except error.AutotestError: 262 raise 263 except Exception, e: 264 raise error.UnhandledTestError(e) 265 266 267def _cherry_pick_args(func, args, dargs): 268 # Cherry pick args: 269 if func.func_code.co_flags & 0x04: 270 # func accepts *args, so return the entire args. 271 p_args = args 272 else: 273 p_args = () 274 275 # Cherry pick dargs: 276 if func.func_code.co_flags & 0x08: 277 # func accepts **dargs, so return the entire dargs. 278 p_dargs = dargs 279 else: 280 p_dargs = {} 281 for param in func.func_code.co_varnames[:func.func_code.co_argcount]: 282 if param in dargs: 283 p_dargs[param] = dargs[param] 284 285 return p_args, p_dargs 286 287 288def _validate_args(args, dargs, *funcs): 289 all_co_flags = 0 290 all_varnames = () 291 for func in funcs: 292 all_co_flags |= func.func_code.co_flags 293 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 294 295 # Check if given args belongs to at least one of the methods below. 296 if len(args) > 0: 297 # Current implementation doesn't allow the use of args. 298 raise error.AutotestError('Unnamed arguments not accepted. Please, ' \ 299 'call job.run_test with named args only') 300 301 # Check if given dargs belongs to at least one of the methods below. 302 if len(dargs) > 0: 303 if not all_co_flags & 0x08: 304 # no func accepts *dargs, so: 305 for param in dargs: 306 if not param in all_varnames: 307 raise error.AutotestError('Unknown parameter: %s' % param) 308 309 310def _installtest(job, url): 311 (group, name) = job.pkgmgr.get_package_name(url, 'test') 312 313 # Bail if the test is already installed 314 group_dir = os.path.join(job.testdir, "download", group) 315 if os.path.exists(os.path.join(group_dir, name)): 316 return (group, name) 317 318 # If the group directory is missing create it and add 319 # an empty __init__.py so that sub-directories are 320 # considered for import. 321 if not os.path.exists(group_dir): 322 os.mkdir(group_dir) 323 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 324 f.close() 325 326 print name + ": installing test url=" + url 327 tarball = os.path.basename(url) 328 tarball_path = os.path.join(group_dir, tarball) 329 test_dir = os.path.join(group_dir, name) 330 job.pkgmgr.fetch_pkg(tarball, tarball_path, 331 repo_url = os.path.dirname(url)) 332 333 # Create the directory for the test 334 if not os.path.exists(test_dir): 335 os.mkdir(os.path.join(group_dir, name)) 336 337 job.pkgmgr.untar_pkg(tarball_path, test_dir) 338 339 os.remove(tarball_path) 340 341 # For this 'sub-object' to be importable via the name 342 # 'group.name' we need to provide an __init__.py, 343 # so link the main entry point to this. 344 os.symlink(name + '.py', os.path.join(group_dir, name, 345 '__init__.py')) 346 347 # The test is now installed. 348 return (group, name) 349 350 351def runtest(job, url, tag, args, dargs, 352 local_namespace={}, global_namespace={}, after_test_hook=None): 353 local_namespace = local_namespace.copy() 354 global_namespace = global_namespace.copy() 355 356 # if this is not a plain test name then download and install the 357 # specified test 358 if url.endswith('.tar.bz2'): 359 (group, testname) = _installtest(job, url) 360 bindir = os.path.join(job.testdir, 'download', group, testname) 361 site_bindir = None 362 else: 363 # if the test is local, it can be found in either testdir 364 # or site_testdir. tests in site_testdir override tests 365 # defined in testdir 366 (group, testname) = ('', url) 367 bindir = os.path.join(job.testdir, group, testname) 368 if hasattr(job, 'site_testdir'): 369 site_bindir = os.path.join(job.site_testdir, 370 group, testname) 371 else: 372 site_bindir = None 373 374 # The job object here can be that of a server side job or a client 375 # side job. 'install_pkg' method won't be present for server side 376 # jobs, so do the fetch only if that method is present in the job 377 # obj. 378 if hasattr(job, 'install_pkg'): 379 try: 380 job.install_pkg(testname, 'test', bindir) 381 except packages.PackageInstallError, e: 382 # continue as a fall back mechanism and see if the test code 383 # already exists on the machine 384 pass 385 386 outputdir = os.path.join(job.resultdir, testname) 387 if tag: 388 outputdir += '.' + tag 389 390 # if we can find the test in site_bindir, use this version 391 if site_bindir and os.path.exists(site_bindir): 392 bindir = site_bindir 393 testdir = job.site_testdir 394 elif os.path.exists(bindir): 395 testdir = job.testdir 396 else: 397 raise error.TestError(testname + ': test does not exist') 398 399 local_namespace['job'] = job 400 local_namespace['bindir'] = bindir 401 local_namespace['outputdir'] = outputdir 402 403 if group: 404 sys.path.insert(0, os.path.join(testdir, 'download')) 405 group += '.' 406 else: 407 sys.path.insert(0, os.path.join(testdir, testname)) 408 409 try: 410 exec ("import %s%s" % (group, testname), 411 local_namespace, global_namespace) 412 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 413 (group, testname, testname), 414 local_namespace, global_namespace) 415 finally: 416 sys.path.pop(0) 417 418 pwd = os.getcwd() 419 os.chdir(outputdir) 420 try: 421 mytest = global_namespace['mytest'] 422 mytest._exec(args, dargs) 423 finally: 424 os.chdir(pwd) 425 if after_test_hook: 426 after_test_hook(mytest) 427 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 428