test.py revision 32cb5b450ce80e5e5871142592973e57f3771ae2
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback 20import warnings, logging 21 22from autotest_lib.client.common_lib import error, packages 23from autotest_lib.client.bin import utils 24 25 26class base_test: 27 preserve_srcdir = False 28 network_destabilizing = False 29 30 def __init__(self, job, bindir, outputdir): 31 self.job = job 32 self.pkgmgr = job.pkgmgr 33 self.autodir = job.autodir 34 35 self.outputdir = outputdir 36 self.tagged_testname = os.path.basename(self.outputdir) 37 self.resultsdir = os.path.join(self.outputdir, 'results') 38 os.mkdir(self.resultsdir) 39 self.profdir = os.path.join(self.outputdir, 'profiling') 40 os.mkdir(self.profdir) 41 self.debugdir = os.path.join(self.outputdir, 'debug') 42 os.mkdir(self.debugdir) 43 self.bindir = bindir 44 if hasattr(job, 'libdir'): 45 self.libdir = job.libdir 46 self.srcdir = os.path.join(self.bindir, 'src') 47 self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname, 48 dir=job.tmpdir) 49 self._keyvals = [] 50 self._new_keyval = False 51 self.failed_constraints = [] 52 53 54 def assert_(self, expr, msg='Assertion failed.'): 55 if not expr: 56 raise error.TestError(msg) 57 58 59 def write_test_keyval(self, attr_dict): 60 utils.write_keyval(self.outputdir, attr_dict) 61 62 63 @staticmethod 64 def _append_type_to_keys(dictionary, typename): 65 new_dict = {} 66 for key, value in dictionary.iteritems(): 67 new_key = "%s{%s}" % (key, typename) 68 new_dict[new_key] = value 69 return new_dict 70 71 72 def write_perf_keyval(self, perf_dict): 73 self.write_iteration_keyval({}, perf_dict) 74 75 76 def write_attr_keyval(self, attr_dict): 77 self.write_iteration_keyval(attr_dict, {}) 78 79 80 def write_iteration_keyval(self, attr_dict, perf_dict): 81 # append the dictionaries before they have the {perf} and {attr} added 82 self._keyvals.append({'attr':attr_dict, 'perf':perf_dict}) 83 self._new_keyval = True 84 85 if attr_dict: 86 attr_dict = self._append_type_to_keys(attr_dict, "attr") 87 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 88 89 if perf_dict: 90 perf_dict = self._append_type_to_keys(perf_dict, "perf") 91 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 92 93 keyval_path = os.path.join(self.resultsdir, "keyval") 94 print >> open(keyval_path, "a"), "" 95 96 97 def analyze_perf_constraints(self, constraints): 98 if not self._new_keyval: 99 return 100 101 self._new_keyval = False 102 failures = [] 103 for constraint in constraints: 104 print "___________________ constraint = %s" % constraint 105 print "___________________ keyvals = %s" % self._keyvals[-1]['perf'] 106 try: 107 if not eval(constraint, self._keyvals[-1]['perf']): 108 failures.append('%s' % constraint) 109 except: 110 failures.append('could not evaluate constraint: %s' 111 % constraint) 112 113 # keep track of the errors for each iteration 114 self.failed_constraints.append(failures) 115 116 117 def process_failed_constraints(self): 118 msg = '' 119 for i, failures in enumerate(self.failed_constraints): 120 if failures: 121 msg += 'iteration %d:%s ' % (i, ','.join(failures)) 122 123 if msg: 124 raise error.TestFail(msg) 125 126 127 def initialize(self): 128 pass 129 130 131 def setup(self): 132 pass 133 134 135 def warmup(self, *args, **dargs): 136 pass 137 138 139 def drop_caches_between_iterations(self): 140 if self.job.drop_caches_between_iterations: 141 print "Dropping caches between iterations" 142 utils.drop_caches() 143 144 145 def _call_run_once(self, before_iteration_hook, after_iteration_hook, 146 constraints, args, dargs): 147 self.drop_caches_between_iterations() 148 149 # execute iteration hooks 150 if before_iteration_hook: 151 before_iteration_hook(self) 152 self.run_once(*args, **dargs) 153 if after_iteration_hook: 154 after_iteration_hook(self) 155 156 self.postprocess_iteration() 157 self.analyze_perf_constraints(constraints) 158 159 160 def execute(self, iterations=None, test_length=None, profile_only=False, 161 _get_time=time.time, postprocess_profiled_run=None, 162 before_iteration_hook=None, after_iteration_hook=None, 163 constraints=(), *args, **dargs): 164 """ 165 This is the basic execute method for the tests inherited from base_test. 166 If you want to implement a benchmark test, it's better to implement 167 the run_once function, to cope with the profiling infrastructure. For 168 other tests, you can just override the default implementation. 169 170 @param test_length: The minimum test length in seconds. We'll run the 171 run_once function for a number of times large enough to cover the 172 minimum test length. 173 174 @param iterations: A number of iterations that we'll run the run_once 175 function. This parameter is incompatible with test_length and will 176 be silently ignored if you specify both. 177 178 @param profile_only: Do not run any test iterations before running 179 the test under the profiler. This is equivalent to specifying 180 iterations=0 but is much easier to remember/read/comprehend when 181 making control files with job.run_test(profile_only=True) in it 182 rather than job.run_test(iterations=0). 183 184 @param _get_time: [time.time] Used for unit test time injection. 185 186 @param postprocess_profiled_run: Run the postprocessing for the 187 profiled run. 188 """ 189 190 # For our special class of tests, the benchmarks, we don't want 191 # profilers to run during the test iterations. Let's reserve only 192 # the last iteration for profiling, if needed. So let's stop 193 # all profilers if they are present and active. 194 profilers = self.job.profilers 195 if profilers.active(): 196 profilers.stop(self) 197 # If the user called this test in an odd way (specified both iterations 198 # and test_length), let's warn them. 199 if iterations and test_length: 200 logging.info( 201 'Iterations parameter ignored (timed execution).') 202 if test_length: 203 test_start = _get_time() 204 time_elapsed = 0 205 timed_counter = 0 206 logging.info('Test started. Minimum test length: %d s', 207 test_length) 208 while time_elapsed < test_length: 209 timed_counter = timed_counter + 1 210 if time_elapsed == 0: 211 logging.info('Executing iteration %d', timed_counter) 212 elif time_elapsed > 0: 213 logging.info( 214 'Executing iteration %d, time_elapsed %d s', 215 timed_counter, time_elapsed) 216 self._call_run_once(before_iteration_hook, after_iteration_hook, 217 constraints, args, dargs) 218 test_iteration_finish = _get_time() 219 time_elapsed = test_iteration_finish - test_start 220 logging.info('Test finished after %d iterations', 221 timed_counter) 222 logging.info('Time elapsed: %d s', time_elapsed) 223 else: 224 orig_iterations = iterations 225 if profile_only: 226 if iterations: 227 logging.info('Iterations parameter ignored ' 228 '(profile_only=True).') 229 iterations = 0 230 elif iterations is None: 231 iterations = 1 232 if iterations: 233 logging.info('Test started. ' 234 'Number of iterations: %d', iterations) 235 for self.iteration in xrange(1, iterations+1): 236 logging.info('Executing iteration %d of %d', 237 self.iteration, iterations) 238 self._call_run_once(before_iteration_hook, 239 after_iteration_hook, 240 constraints, args, dargs) 241 logging.info('Test finished after %d iterations.', 242 iterations) 243 244 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 245 246 # Do any postprocessing, normally extracting performance keyvals, etc 247 self.postprocess() 248 self.process_failed_constraints() 249 250 251 def run_once_profiling(self, postprocess_profiled_run, *args, **dargs): 252 profilers = self.job.profilers 253 # Do a profiling run if necessary 254 if profilers.present(): 255 self.drop_caches_between_iterations() 256 profilers.start(self) 257 print 'Profilers present. Profiling run started' 258 try: 259 self.iteration = 0 # indicator this is a profiling run 260 self.run_once(*args, **dargs) 261 262 # Priority to the run_once() argument over the attribute. 263 postprocess_attribute = getattr(self, 264 'postprocess_profiled_run', 265 False) 266 267 if (postprocess_profiled_run or 268 (postprocess_profiled_run is None and 269 postprocess_attribute)): 270 self.postprocess_iteration() 271 272 finally: 273 profilers.stop(self) 274 profilers.report(self) 275 276 277 def postprocess(self): 278 pass 279 280 281 def postprocess_iteration(self): 282 pass 283 284 285 def cleanup(self): 286 pass 287 288 289 def _setup_test_logging_handler(self): 290 """ 291 Adds a file handler during test execution, which will give test writers 292 the ability to obtain test results on the test results dir just by 293 making logging calls. 294 """ 295 result_filename = os.path.join(self.resultsdir, 296 '%s.log' % self.tagged_testname) 297 self.test_handler = logging.FileHandler(filename=result_filename, 298 mode='w') 299 fmt_str = '[%(asctime)s - %(module)-15s - %(levelname)-8s] %(message)s' 300 self.test_formatter = logging.Formatter(fmt_str) 301 self.test_handler.setFormatter(self.test_formatter) 302 self.logger = logging.getLogger() 303 self.logger.addHandler(self.test_handler) 304 305 306 def _exec(self, before_iteration_hook, after_iteration_hook, args, dargs): 307 308 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 309 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 310 self._setup_test_logging_handler() 311 try: 312 if self.network_destabilizing: 313 self.job.disable_warnings("NETWORK") 314 315 # write out the test attributes into a keyval 316 dargs = dargs.copy() 317 run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup) 318 keyvals = dargs.pop('test_attributes', {}).copy() 319 keyvals['version'] = self.version 320 for i, arg in enumerate(args): 321 keyvals['param-%d' % i] = repr(arg) 322 for name, arg in dargs.iteritems(): 323 keyvals['param-%s' % name] = repr(arg) 324 self.write_test_keyval(keyvals) 325 326 _validate_args(args, dargs, self.initialize, self.setup, 327 self.execute, self.cleanup) 328 329 try: 330 # Initialize: 331 _cherry_pick_call(self.initialize, *args, **dargs) 332 333 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 334 try: 335 fcntl.flock(lockfile, fcntl.LOCK_EX) 336 # Setup: (compile and install the test, if needed) 337 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 338 utils.update_version(self.srcdir, self.preserve_srcdir, 339 self.version, self.setup, 340 *p_args, **p_dargs) 341 finally: 342 fcntl.flock(lockfile, fcntl.LOCK_UN) 343 lockfile.close() 344 345 # Execute: 346 os.chdir(self.outputdir) 347 348 # call self.warmup cherry picking the arguments it accepts and 349 # translate exceptions if needed 350 _call_test_function(_cherry_pick_call, self.warmup, 351 *args, **dargs) 352 353 # insert the iteration hooks arguments to be potentially 354 # passed over to self.execute() if it accepts them 355 dargs['before_iteration_hook'] = before_iteration_hook 356 dargs['after_iteration_hook'] = after_iteration_hook 357 358 if hasattr(self, 'run_once'): 359 p_args, p_dargs = _cherry_pick_args(self.run_once, 360 args, dargs) 361 # pull in any non-* and non-** args from self.execute 362 for param in _get_nonstar_args(self.execute): 363 if param in dargs: 364 p_dargs[param] = dargs[param] 365 else: 366 p_args, p_dargs = _cherry_pick_args(self.execute, 367 args, dargs) 368 369 _call_test_function(self.execute, *p_args, **p_dargs) 370 except Exception: 371 # Save the exception while we run our cleanup() before 372 # reraising it. 373 exc_info = sys.exc_info() 374 try: 375 try: 376 if run_cleanup: 377 _cherry_pick_call(self.cleanup, *args, **dargs) 378 except Exception: 379 print 'Ignoring exception during cleanup() phase:' 380 traceback.print_exc() 381 print 'Now raising the earlier %s error' % exc_info[0] 382 finally: 383 self.job.stderr.restore() 384 self.job.stdout.restore() 385 try: 386 raise exc_info[0], exc_info[1], exc_info[2] 387 finally: 388 # http://docs.python.org/library/sys.html#sys.exc_info 389 # Be nice and prevent a circular reference. 390 del exc_info 391 else: 392 try: 393 if run_cleanup: 394 _cherry_pick_call(self.cleanup, *args, **dargs) 395 finally: 396 self.logger.removeHandler(self.test_handler) 397 self.job.stderr.restore() 398 self.job.stdout.restore() 399 except error.AutotestError: 400 if self.network_destabilizing: 401 self.job.enable_warnings("NETWORK") 402 # Pass already-categorized errors on up. 403 raise 404 except Exception, e: 405 if self.network_destabilizing: 406 self.job.enable_warnings("NETWORK") 407 # Anything else is an ERROR in our own code, not execute(). 408 raise error.UnhandledTestError(e) 409 else: 410 if self.network_destabilizing: 411 self.job.enable_warnings("NETWORK") 412 413 414def _get_nonstar_args(func): 415 """Extract all the (normal) function parameter names. 416 417 Given a function, returns a tuple of parameter names, specifically 418 excluding the * and ** parameters, if the function accepts them. 419 420 @param func: A callable that we want to chose arguments for. 421 422 @return: A tuple of parameters accepted by the function. 423 """ 424 return func.func_code.co_varnames[:func.func_code.co_argcount] 425 426 427def _cherry_pick_args(func, args, dargs): 428 """Sanitize positional and keyword arguments before calling a function. 429 430 Given a callable (func), an argument tuple and a dictionary of keyword 431 arguments, pick only those arguments which the function is prepared to 432 accept and return a new argument tuple and keyword argument dictionary. 433 434 Args: 435 func: A callable that we want to choose arguments for. 436 args: A tuple of positional arguments to consider passing to func. 437 dargs: A dictionary of keyword arguments to consider passing to func. 438 Returns: 439 A tuple of: (args tuple, keyword arguments dictionary) 440 """ 441 # Cherry pick args: 442 if func.func_code.co_flags & 0x04: 443 # func accepts *args, so return the entire args. 444 p_args = args 445 else: 446 p_args = () 447 448 # Cherry pick dargs: 449 if func.func_code.co_flags & 0x08: 450 # func accepts **dargs, so return the entire dargs. 451 p_dargs = dargs 452 else: 453 # Only return the keyword arguments that func accepts. 454 p_dargs = {} 455 for param in _get_nonstar_args(func): 456 if param in dargs: 457 p_dargs[param] = dargs[param] 458 459 return p_args, p_dargs 460 461 462def _cherry_pick_call(func, *args, **dargs): 463 """Cherry picks arguments from args/dargs based on what "func" accepts 464 and calls the function with the picked arguments.""" 465 p_args, p_dargs = _cherry_pick_args(func, args, dargs) 466 return func(*p_args, **p_dargs) 467 468 469def _validate_args(args, dargs, *funcs): 470 """Verify that arguments are appropriate for at least one callable. 471 472 Given a list of callables as additional parameters, verify that 473 the proposed keyword arguments in dargs will each be accepted by at least 474 one of the callables. 475 476 NOTE: args is currently not supported and must be empty. 477 478 Args: 479 args: A tuple of proposed positional arguments. 480 dargs: A dictionary of proposed keyword arguments. 481 *funcs: Callables to be searched for acceptance of args and dargs. 482 Raises: 483 error.AutotestError: if an arg won't be accepted by any of *funcs. 484 """ 485 all_co_flags = 0 486 all_varnames = () 487 for func in funcs: 488 all_co_flags |= func.func_code.co_flags 489 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 490 491 # Check if given args belongs to at least one of the methods below. 492 if len(args) > 0: 493 # Current implementation doesn't allow the use of args. 494 raise error.TestError('Unnamed arguments not accepted. Please ' 495 'call job.run_test with named args only') 496 497 # Check if given dargs belongs to at least one of the methods below. 498 if len(dargs) > 0: 499 if not all_co_flags & 0x08: 500 # no func accepts *dargs, so: 501 for param in dargs: 502 if not param in all_varnames: 503 raise error.AutotestError('Unknown parameter: %s' % param) 504 505 506def _installtest(job, url): 507 (group, name) = job.pkgmgr.get_package_name(url, 'test') 508 509 # Bail if the test is already installed 510 group_dir = os.path.join(job.testdir, "download", group) 511 if os.path.exists(os.path.join(group_dir, name)): 512 return (group, name) 513 514 # If the group directory is missing create it and add 515 # an empty __init__.py so that sub-directories are 516 # considered for import. 517 if not os.path.exists(group_dir): 518 os.mkdir(group_dir) 519 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 520 f.close() 521 522 print name + ": installing test url=" + url 523 tarball = os.path.basename(url) 524 tarball_path = os.path.join(group_dir, tarball) 525 test_dir = os.path.join(group_dir, name) 526 job.pkgmgr.fetch_pkg(tarball, tarball_path, 527 repo_url = os.path.dirname(url)) 528 529 # Create the directory for the test 530 if not os.path.exists(test_dir): 531 os.mkdir(os.path.join(group_dir, name)) 532 533 job.pkgmgr.untar_pkg(tarball_path, test_dir) 534 535 os.remove(tarball_path) 536 537 # For this 'sub-object' to be importable via the name 538 # 'group.name' we need to provide an __init__.py, 539 # so link the main entry point to this. 540 os.symlink(name + '.py', os.path.join(group_dir, name, 541 '__init__.py')) 542 543 # The test is now installed. 544 return (group, name) 545 546 547def _call_test_function(func, *args, **dargs): 548 """Calls a test function and translates exceptions so that errors 549 inside test code are considered test failures.""" 550 try: 551 return func(*args, **dargs) 552 except error.AutotestError: 553 # Pass already-categorized errors on up as is. 554 raise 555 except Exception, e: 556 # Other exceptions must be treated as a FAIL when 557 # raised during the test functions 558 raise error.UnhandledTestFail(e) 559 560 561def runtest(job, url, tag, args, dargs, 562 local_namespace={}, global_namespace={}, 563 before_test_hook=None, after_test_hook=None, 564 before_iteration_hook=None, after_iteration_hook=None): 565 local_namespace = local_namespace.copy() 566 global_namespace = global_namespace.copy() 567 568 # if this is not a plain test name then download and install the 569 # specified test 570 if url.endswith('.tar.bz2'): 571 (group, testname) = _installtest(job, url) 572 bindir = os.path.join(job.testdir, 'download', group, testname) 573 site_bindir = None 574 else: 575 # if the test is local, it can be found in either testdir 576 # or site_testdir. tests in site_testdir override tests 577 # defined in testdir 578 (group, testname) = ('', url) 579 bindir = os.path.join(job.testdir, group, testname) 580 if hasattr(job, 'site_testdir'): 581 site_bindir = os.path.join(job.site_testdir, 582 group, testname) 583 else: 584 site_bindir = None 585 586 # The job object here can be that of a server side job or a client 587 # side job. 'install_pkg' method won't be present for server side 588 # jobs, so do the fetch only if that method is present in the job 589 # obj. 590 if hasattr(job, 'install_pkg'): 591 try: 592 job.install_pkg(testname, 'test', bindir) 593 except packages.PackageInstallError, e: 594 # continue as a fall back mechanism and see if the test code 595 # already exists on the machine 596 pass 597 598 outputdir = os.path.join(job.resultdir, testname) 599 if tag: 600 outputdir += '.' + tag 601 602 # if we can find the test in site_bindir, use this version 603 if site_bindir and os.path.exists(site_bindir): 604 bindir = site_bindir 605 testdir = job.site_testdir 606 elif os.path.exists(bindir): 607 testdir = job.testdir 608 else: 609 raise error.TestError(testname + ': test does not exist') 610 611 local_namespace['job'] = job 612 local_namespace['bindir'] = bindir 613 local_namespace['outputdir'] = outputdir 614 615 if group: 616 sys.path.insert(0, os.path.join(testdir, 'download')) 617 group += '.' 618 else: 619 sys.path.insert(0, os.path.join(testdir, testname)) 620 621 try: 622 exec ("import %s%s" % (group, testname), 623 local_namespace, global_namespace) 624 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 625 (group, testname, testname), 626 local_namespace, global_namespace) 627 finally: 628 sys.path.pop(0) 629 630 pwd = os.getcwd() 631 os.chdir(outputdir) 632 633 try: 634 mytest = global_namespace['mytest'] 635 if before_test_hook: 636 before_test_hook(mytest) 637 # args/dargs are the test arguments as given by the control file so 638 # we cannot put the iteration hooks inside the dictionary 639 mytest._exec(before_iteration_hook, after_iteration_hook, args, dargs) 640 finally: 641 os.chdir(pwd) 642 if after_test_hook: 643 after_test_hook(mytest) 644 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 645