test.py revision a49c5cb549607a2049c45eed66200e48aae48c3b
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback 20import warnings 21 22from autotest_lib.client.common_lib import error, packages, debug 23from autotest_lib.client.bin import utils 24 25 26class base_test: 27 preserve_srcdir = False 28 29 def __init__(self, job, bindir, outputdir): 30 self.job = job 31 self.pkgmgr = job.pkgmgr 32 self.autodir = job.autodir 33 34 self.outputdir = outputdir 35 tagged_testname = os.path.basename(self.outputdir) 36 self.resultsdir = os.path.join(self.outputdir, 'results') 37 os.mkdir(self.resultsdir) 38 self.profdir = os.path.join(self.outputdir, 'profiling') 39 os.mkdir(self.profdir) 40 self.debugdir = os.path.join(self.outputdir, 'debug') 41 os.mkdir(self.debugdir) 42 self.bindir = bindir 43 if hasattr(job, 'libdir'): 44 self.libdir = job.libdir 45 self.srcdir = os.path.join(self.bindir, 'src') 46 self.tmpdir = tempfile.mkdtemp("_" + tagged_testname, dir=job.tmpdir) 47 self.test_log = debug.get_logger(module='tests') 48 49 50 def assert_(self, expr, msg='Assertion failed.'): 51 if not expr: 52 raise error.TestError(msg) 53 54 55 def write_test_keyval(self, attr_dict): 56 utils.write_keyval(self.outputdir, attr_dict) 57 58 59 @staticmethod 60 def _append_type_to_keys(dictionary, typename): 61 new_dict = {} 62 for key, value in dictionary.iteritems(): 63 new_key = "%s{%s}" % (key, typename) 64 new_dict[new_key] = value 65 return new_dict 66 67 68 def write_perf_keyval(self, perf_dict): 69 self.write_iteration_keyval({}, perf_dict) 70 71 72 def write_attr_keyval(self, attr_dict): 73 self.write_iteration_keyval(attr_dict, {}) 74 75 76 def write_iteration_keyval(self, attr_dict, perf_dict): 77 if attr_dict: 78 attr_dict = self._append_type_to_keys(attr_dict, "attr") 79 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 80 81 if perf_dict: 82 perf_dict = self._append_type_to_keys(perf_dict, "perf") 83 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 84 85 keyval_path = os.path.join(self.resultsdir, "keyval") 86 print >> open(keyval_path, "a"), "" 87 88 89 def initialize(self): 90 print 'No initialize phase defined' 91 pass 92 93 94 def setup(self): 95 pass 96 97 98 def warmup(self, *args, **dargs): 99 pass 100 101 102 def drop_caches_between_iterations(self): 103 if self.job.drop_caches_between_iterations: 104 print "Dropping caches between iterations" 105 utils.drop_caches() 106 107 108 def execute(self, iterations=None, test_length=None, profile_only=False, 109 _get_time=time.time, postprocess_profiled_run=None, 110 *args, **dargs): 111 """ 112 This is the basic execute method for the tests inherited from base_test. 113 If you want to implement a benchmark test, it's better to implement 114 the run_once function, to cope with the profiling infrastructure. For 115 other tests, you can just override the default implementation. 116 117 @param test_length: The minimum test length in seconds. We'll run the 118 run_once function for a number of times large enough to cover the 119 minimum test length. 120 121 @param iterations: A number of iterations that we'll run the run_once 122 function. This parameter is incompatible with test_length and will 123 be silently ignored if you specify both. 124 125 @param profile_only: Do not run any test iterations before running 126 the test under the profiler. This is equivalent to specifying 127 iterations=0 but is much easier to remember/read/comprehend when 128 making control files with job.run_test(profile_only=True) in it 129 rather than job.run_test(iterations=0). 130 131 @param _get_time: [time.time] Used for unit test time injection. 132 133 @param postprocess_profiled_run: Run the postprocessing for the 134 profiled run. 135 """ 136 137 self.warmup(*args, **dargs) 138 # For our special class of tests, the benchmarks, we don't want 139 # profilers to run during the test iterations. Let's reserve only 140 # the last iteration for profiling, if needed. So let's stop 141 # all profilers if they are present and active. 142 profilers = self.job.profilers 143 if profilers.active(): 144 profilers.stop(self) 145 # If the user called this test in an odd way (specified both iterations 146 # and test_length), let's warn them. 147 if iterations and test_length: 148 self.test_log.info( 149 'Iterations parameter ignored (timed execution).') 150 if test_length: 151 test_start = _get_time() 152 time_elapsed = 0 153 timed_counter = 0 154 self.test_log.info('Test started. Minimum test length: %d s', 155 test_length) 156 while time_elapsed < test_length: 157 timed_counter = timed_counter + 1 158 if time_elapsed == 0: 159 self.test_log.info('Executing iteration %d', timed_counter) 160 elif time_elapsed > 0: 161 self.test_log.info( 162 'Executing iteration %d, time_elapsed %d s', 163 timed_counter, time_elapsed) 164 self.drop_caches_between_iterations() 165 self.run_once(*args, **dargs) 166 test_iteration_finish = _get_time() 167 time_elapsed = test_iteration_finish - test_start 168 self.test_log.info('Test finished after %d iterations', 169 timed_counter) 170 self.test_log.info('Time elapsed: %d s', time_elapsed) 171 else: 172 orig_iterations = iterations 173 if profile_only: 174 if iterations: 175 self.test_log.info('Iterations parameter ignored ' 176 '(profile_only=True).') 177 iterations = 0 178 elif iterations is None: 179 iterations = 1 180 if iterations: 181 self.test_log.info('Test started. ' 182 'Number of iterations: %d', iterations) 183 for self.iteration in xrange(1, iterations+1): 184 self.test_log.info('Executing iteration %d of %d', 185 self.iteration, iterations) 186 self.drop_caches_between_iterations() 187 self.run_once(*args, **dargs) 188 self.postprocess_iteration() 189 self.test_log.info('Test finished after %d iterations.', 190 iterations) 191 192 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 193 194 # Do any postprocessing, normally extracting performance keyvals, etc 195 self.postprocess() 196 197 198 def run_once_profiling(self, postprocess_profiled_run, *args, **dargs): 199 profilers = self.job.profilers 200 # Do a profiling run if necessary 201 if profilers.present(): 202 self.drop_caches_between_iterations() 203 profilers.start(self) 204 print 'Profilers present. Profiling run started' 205 try: 206 self.iteration = 0 # indicator this is a profiling run 207 self.run_once(*args, **dargs) 208 209 # Priority to the run_once() argument over the attribute. 210 postprocess_attribute = getattr(self, 211 'postprocess_profiled_run', 212 False) 213 214 if (postprocess_profiled_run or 215 (postprocess_profiled_run is None and 216 postprocess_attribute)): 217 self.postprocess_iteration() 218 219 finally: 220 profilers.stop(self) 221 profilers.report(self) 222 223 224 def postprocess(self): 225 pass 226 227 228 def postprocess_iteration(self): 229 pass 230 231 232 def cleanup(self): 233 pass 234 235 236 def _run_cleanup(self, args, dargs): 237 """Call self.cleanup and convert exceptions as appropriate. 238 239 Args: 240 args: An argument tuple to pass to cleanup. 241 dargs: A dictionary of with potential keyword arguments for cleanup. 242 """ 243 p_args, p_dargs = _cherry_pick_args(self.cleanup, args, dargs) 244 try: 245 self.cleanup(*p_args, **p_dargs) 246 except error.AutotestError: 247 raise 248 except Exception, e: 249 # Other exceptions must be treated as a ERROR when 250 # raised during the cleanup() phase. 251 raise error.UnhandledTestError(e) 252 253 254 def _exec(self, args, dargs): 255 256 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 257 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 258 259 try: 260 # write out the test attributes into a keyval 261 dargs = dargs.copy() 262 run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup) 263 keyvals = dargs.pop('test_attributes', {}).copy() 264 keyvals['version'] = self.version 265 for i, arg in enumerate(args): 266 keyvals['param-%d' % i] = repr(arg) 267 for name, arg in dargs.iteritems(): 268 keyvals['param-%s' % name] = repr(arg) 269 self.write_test_keyval(keyvals) 270 271 _validate_args(args, dargs, self.initialize, self.setup, 272 self.execute, self.cleanup) 273 274 try: 275 # Initialize: 276 p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs) 277 self.initialize(*p_args, **p_dargs) 278 279 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 280 try: 281 fcntl.flock(lockfile, fcntl.LOCK_EX) 282 # Setup: (compile and install the test, if needed) 283 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 284 utils.update_version(self.srcdir, self.preserve_srcdir, 285 self.version, self.setup, 286 *p_args, **p_dargs) 287 finally: 288 fcntl.flock(lockfile, fcntl.LOCK_UN) 289 lockfile.close() 290 291 # Execute: 292 os.chdir(self.outputdir) 293 if hasattr(self, 'run_once'): 294 p_args, p_dargs = _cherry_pick_args(self.run_once, 295 args, dargs) 296 # pull in any non-* and non-** args from self.execute 297 for param in _get_nonstar_args(self.execute): 298 if param in dargs: 299 p_dargs[param] = dargs[param] 300 else: 301 p_args, p_dargs = _cherry_pick_args(self.execute, 302 args, dargs) 303 try: 304 self.execute(*p_args, **p_dargs) 305 except error.AutotestError: 306 # Pass already-categorized errors on up as is. 307 raise 308 except Exception, e: 309 # Other exceptions must be treated as a FAIL when 310 # raised during the execute() phase. 311 raise error.UnhandledTestFail(e) 312 except Exception: 313 # Save the exception while we run our cleanup() before 314 # reraising it. 315 exc_info = sys.exc_info() 316 try: 317 try: 318 if run_cleanup: 319 self._run_cleanup(args, dargs) 320 except Exception: 321 print 'Ignoring exception during cleanup() phase:' 322 traceback.print_exc() 323 print 'Now raising the earlier %s error' % exc_info[0] 324 finally: 325 self.job.stderr.restore() 326 self.job.stdout.restore() 327 try: 328 raise exc_info[0], exc_info[1], exc_info[2] 329 finally: 330 # http://docs.python.org/library/sys.html#sys.exc_info 331 # Be nice and prevent a circular reference. 332 del exc_info 333 else: 334 try: 335 if run_cleanup: 336 self._run_cleanup(args, dargs) 337 finally: 338 self.job.stderr.restore() 339 self.job.stdout.restore() 340 except error.AutotestError: 341 # Pass already-categorized errors on up. 342 raise 343 except Exception, e: 344 # Anything else is an ERROR in our own code, not execute(). 345 raise error.UnhandledTestError(e) 346 347 348def _get_nonstar_args(func): 349 """Extract all the (normal) function parameter names. 350 351 Given a function, returns a tuple of parameter names, specifically 352 excluding the * and ** parameters, if the function accepts them. 353 354 @param func: A callable that we want to chose arguments for. 355 356 @return: A tuple of parameters accepted by the function. 357 """ 358 return func.func_code.co_varnames[:func.func_code.co_argcount] 359 360 361def _cherry_pick_args(func, args, dargs): 362 """Sanitize positional and keyword arguments before calling a function. 363 364 Given a callable (func), an argument tuple and a dictionary of keyword 365 arguments, pick only those arguments which the function is prepared to 366 accept and return a new argument tuple and keyword argument dictionary. 367 368 Args: 369 func: A callable that we want to choose arguments for. 370 args: A tuple of positional arguments to consider passing to func. 371 dargs: A dictionary of keyword arguments to consider passing to func. 372 Returns: 373 A tuple of: (args tuple, keyword arguments dictionary) 374 """ 375 # Cherry pick args: 376 if func.func_code.co_flags & 0x04: 377 # func accepts *args, so return the entire args. 378 p_args = args 379 else: 380 p_args = () 381 382 # Cherry pick dargs: 383 if func.func_code.co_flags & 0x08: 384 # func accepts **dargs, so return the entire dargs. 385 p_dargs = dargs 386 else: 387 # Only return the keyword arguments that func accepts. 388 p_dargs = {} 389 for param in _get_nonstar_args(func): 390 if param in dargs: 391 p_dargs[param] = dargs[param] 392 393 return p_args, p_dargs 394 395 396def _validate_args(args, dargs, *funcs): 397 """Verify that arguments are appropriate for at least one callable. 398 399 Given a list of callables as additional parameters, verify that 400 the proposed keyword arguments in dargs will each be accepted by at least 401 one of the callables. 402 403 NOTE: args is currently not supported and must be empty. 404 405 Args: 406 args: A tuple of proposed positional arguments. 407 dargs: A dictionary of proposed keyword arguments. 408 *funcs: Callables to be searched for acceptance of args and dargs. 409 Raises: 410 error.AutotestError: if an arg won't be accepted by any of *funcs. 411 """ 412 all_co_flags = 0 413 all_varnames = () 414 for func in funcs: 415 all_co_flags |= func.func_code.co_flags 416 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 417 418 # Check if given args belongs to at least one of the methods below. 419 if len(args) > 0: 420 # Current implementation doesn't allow the use of args. 421 raise error.TestError('Unnamed arguments not accepted. Please ' 422 'call job.run_test with named args only') 423 424 # Check if given dargs belongs to at least one of the methods below. 425 if len(dargs) > 0: 426 if not all_co_flags & 0x08: 427 # no func accepts *dargs, so: 428 for param in dargs: 429 if not param in all_varnames: 430 raise error.AutotestError('Unknown parameter: %s' % param) 431 432 433def _installtest(job, url): 434 (group, name) = job.pkgmgr.get_package_name(url, 'test') 435 436 # Bail if the test is already installed 437 group_dir = os.path.join(job.testdir, "download", group) 438 if os.path.exists(os.path.join(group_dir, name)): 439 return (group, name) 440 441 # If the group directory is missing create it and add 442 # an empty __init__.py so that sub-directories are 443 # considered for import. 444 if not os.path.exists(group_dir): 445 os.mkdir(group_dir) 446 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 447 f.close() 448 449 print name + ": installing test url=" + url 450 tarball = os.path.basename(url) 451 tarball_path = os.path.join(group_dir, tarball) 452 test_dir = os.path.join(group_dir, name) 453 job.pkgmgr.fetch_pkg(tarball, tarball_path, 454 repo_url = os.path.dirname(url)) 455 456 # Create the directory for the test 457 if not os.path.exists(test_dir): 458 os.mkdir(os.path.join(group_dir, name)) 459 460 job.pkgmgr.untar_pkg(tarball_path, test_dir) 461 462 os.remove(tarball_path) 463 464 # For this 'sub-object' to be importable via the name 465 # 'group.name' we need to provide an __init__.py, 466 # so link the main entry point to this. 467 os.symlink(name + '.py', os.path.join(group_dir, name, 468 '__init__.py')) 469 470 # The test is now installed. 471 return (group, name) 472 473 474def runtest(job, url, tag, args, dargs, 475 local_namespace={}, global_namespace={}, 476 before_test_hook=None, after_test_hook=None): 477 local_namespace = local_namespace.copy() 478 global_namespace = global_namespace.copy() 479 480 # if this is not a plain test name then download and install the 481 # specified test 482 if url.endswith('.tar.bz2'): 483 (group, testname) = _installtest(job, url) 484 bindir = os.path.join(job.testdir, 'download', group, testname) 485 site_bindir = None 486 else: 487 # if the test is local, it can be found in either testdir 488 # or site_testdir. tests in site_testdir override tests 489 # defined in testdir 490 (group, testname) = ('', url) 491 bindir = os.path.join(job.testdir, group, testname) 492 if hasattr(job, 'site_testdir'): 493 site_bindir = os.path.join(job.site_testdir, 494 group, testname) 495 else: 496 site_bindir = None 497 498 # The job object here can be that of a server side job or a client 499 # side job. 'install_pkg' method won't be present for server side 500 # jobs, so do the fetch only if that method is present in the job 501 # obj. 502 if hasattr(job, 'install_pkg'): 503 try: 504 job.install_pkg(testname, 'test', bindir) 505 except packages.PackageInstallError, e: 506 # continue as a fall back mechanism and see if the test code 507 # already exists on the machine 508 pass 509 510 outputdir = os.path.join(job.resultdir, testname) 511 if tag: 512 outputdir += '.' + tag 513 514 # if we can find the test in site_bindir, use this version 515 if site_bindir and os.path.exists(site_bindir): 516 bindir = site_bindir 517 testdir = job.site_testdir 518 elif os.path.exists(bindir): 519 testdir = job.testdir 520 else: 521 raise error.TestError(testname + ': test does not exist') 522 523 local_namespace['job'] = job 524 local_namespace['bindir'] = bindir 525 local_namespace['outputdir'] = outputdir 526 527 if group: 528 sys.path.insert(0, os.path.join(testdir, 'download')) 529 group += '.' 530 else: 531 sys.path.insert(0, os.path.join(testdir, testname)) 532 533 try: 534 exec ("import %s%s" % (group, testname), 535 local_namespace, global_namespace) 536 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 537 (group, testname, testname), 538 local_namespace, global_namespace) 539 finally: 540 sys.path.pop(0) 541 542 pwd = os.getcwd() 543 os.chdir(outputdir) 544 try: 545 mytest = global_namespace['mytest'] 546 if before_test_hook: 547 before_test_hook(mytest) 548 mytest._exec(args, dargs) 549 finally: 550 os.chdir(pwd) 551 if after_test_hook: 552 after_test_hook(mytest) 553 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 554