test.py revision 8beabca0c495df2913ef67adcd63a9be8947ad82
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback 20import warnings, logging 21 22from autotest_lib.client.common_lib import error, packages 23from autotest_lib.client.bin import utils 24 25 26class base_test: 27 preserve_srcdir = False 28 network_destabilizing = False 29 30 def __init__(self, job, bindir, outputdir): 31 self.job = job 32 self.pkgmgr = job.pkgmgr 33 self.autodir = job.autodir 34 35 self.outputdir = outputdir 36 self.tagged_testname = os.path.basename(self.outputdir) 37 self.resultsdir = os.path.join(self.outputdir, 'results') 38 os.mkdir(self.resultsdir) 39 self.profdir = os.path.join(self.outputdir, 'profiling') 40 os.mkdir(self.profdir) 41 self.debugdir = os.path.join(self.outputdir, 'debug') 42 os.mkdir(self.debugdir) 43 self.bindir = bindir 44 if hasattr(job, 'libdir'): 45 self.libdir = job.libdir 46 self.srcdir = os.path.join(self.bindir, 'src') 47 self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname, 48 dir=job.tmpdir) 49 self._keyvals = [] 50 self._new_keyval = False 51 self.failed_constraints = [] 52 self.before_iteration_hooks = [] 53 self.after_iteration_hooks = [] 54 55 56 def assert_(self, expr, msg='Assertion failed.'): 57 if not expr: 58 raise error.TestError(msg) 59 60 61 def write_test_keyval(self, attr_dict): 62 utils.write_keyval(self.outputdir, attr_dict) 63 64 65 @staticmethod 66 def _append_type_to_keys(dictionary, typename): 67 new_dict = {} 68 for key, value in dictionary.iteritems(): 69 new_key = "%s{%s}" % (key, typename) 70 new_dict[new_key] = value 71 return new_dict 72 73 74 def write_perf_keyval(self, perf_dict): 75 self.write_iteration_keyval({}, perf_dict) 76 77 78 def write_attr_keyval(self, attr_dict): 79 self.write_iteration_keyval(attr_dict, {}) 80 81 82 def write_iteration_keyval(self, attr_dict, perf_dict): 83 # append the dictionaries before they have the {perf} and {attr} added 84 self._keyvals.append({'attr':attr_dict, 'perf':perf_dict}) 85 self._new_keyval = True 86 87 if attr_dict: 88 attr_dict = self._append_type_to_keys(attr_dict, "attr") 89 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 90 91 if perf_dict: 92 perf_dict = self._append_type_to_keys(perf_dict, "perf") 93 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 94 95 keyval_path = os.path.join(self.resultsdir, "keyval") 96 print >> open(keyval_path, "a"), "" 97 98 99 def analyze_perf_constraints(self, constraints): 100 if not self._new_keyval: 101 return 102 103 self._new_keyval = False 104 failures = [] 105 for constraint in constraints: 106 print "___________________ constraint = %s" % constraint 107 print "___________________ keyvals = %s" % self._keyvals[-1]['perf'] 108 try: 109 if not eval(constraint, self._keyvals[-1]['perf']): 110 failures.append('%s: constraint was not met' % constraint) 111 except: 112 failures.append('could not evaluate constraint: %s' 113 % constraint) 114 115 # keep track of the errors for each iteration 116 self.failed_constraints.append(failures) 117 118 119 def process_failed_constraints(self): 120 msg = '' 121 for i, failures in enumerate(self.failed_constraints): 122 if failures: 123 msg += 'iteration %d:%s ' % (i, ','.join(failures)) 124 125 if msg: 126 raise error.TestFail(msg) 127 128 129 def register_before_iteration_hook(self, iteration_hook): 130 """ 131 This is how we expect test writers to register a before_iteration_hook. 132 This adds the method to the list of hooks which are executed 133 before each iteration. 134 135 @param iteration_hook: Method to run before each iteration. A valid 136 hook accepts a single argument which is the 137 test object. 138 """ 139 self.before_iteration_hooks.append(iteration_hook) 140 141 142 def register_after_iteration_hook(self, iteration_hook): 143 """ 144 This is how we expect test writers to register an after_iteration_hook. 145 This adds the method to the list of hooks which are executed 146 after each iteration. 147 148 @param iteration_hook: Method to run after each iteration. A valid 149 hook accepts a single argument which is the 150 test object. 151 """ 152 self.after_iteration_hooks.append(iteration_hook) 153 154 155 def initialize(self): 156 pass 157 158 159 def setup(self): 160 pass 161 162 163 def warmup(self, *args, **dargs): 164 pass 165 166 167 def drop_caches_between_iterations(self): 168 if self.job.drop_caches_between_iterations: 169 print "Dropping caches between iterations" 170 utils.drop_caches() 171 172 173 def _call_run_once(self, constraints, profile_only, 174 postprocess_profiled_run, args, dargs): 175 self.drop_caches_between_iterations() 176 177 # execute iteration hooks 178 for hook in self.before_iteration_hooks: 179 hook(self) 180 181 if profile_only: 182 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 183 else: 184 self.run_once(*args, **dargs) 185 186 for hook in self.after_iteration_hooks: 187 hook(self) 188 189 self.postprocess_iteration() 190 self.analyze_perf_constraints(constraints) 191 192 193 def execute(self, iterations=None, test_length=None, profile_only=False, 194 _get_time=time.time, postprocess_profiled_run=None, 195 constraints=(), *args, **dargs): 196 """ 197 This is the basic execute method for the tests inherited from base_test. 198 If you want to implement a benchmark test, it's better to implement 199 the run_once function, to cope with the profiling infrastructure. For 200 other tests, you can just override the default implementation. 201 202 @param test_length: The minimum test length in seconds. We'll run the 203 run_once function for a number of times large enough to cover the 204 minimum test length. 205 206 @param iterations: A number of iterations that we'll run the run_once 207 function. This parameter is incompatible with test_length and will 208 be silently ignored if you specify both. 209 210 @param profile_only: If true run X iterations with profilers enabled. 211 Otherwise run X iterations and one with profiling if profiles are 212 enabled. 213 214 @param _get_time: [time.time] Used for unit test time injection. 215 216 @param postprocess_profiled_run: Run the postprocessing for the 217 profiled run. 218 """ 219 220 # For our special class of tests, the benchmarks, we don't want 221 # profilers to run during the test iterations. Let's reserve only 222 # the last iteration for profiling, if needed. So let's stop 223 # all profilers if they are present and active. 224 profilers = self.job.profilers 225 if profilers.active(): 226 profilers.stop(self) 227 # If the user called this test in an odd way (specified both iterations 228 # and test_length), let's warn them. 229 if iterations and test_length: 230 logging.info('Iterations parameter ignored (timed execution).') 231 if test_length: 232 test_start = _get_time() 233 time_elapsed = 0 234 timed_counter = 0 235 logging.info('Test started. Minimum test length: %d s', 236 test_length) 237 while time_elapsed < test_length: 238 timed_counter = timed_counter + 1 239 if time_elapsed == 0: 240 logging.info('Executing iteration %d', timed_counter) 241 elif time_elapsed > 0: 242 logging.info( 243 'Executing iteration %d, time_elapsed %d s', 244 timed_counter, time_elapsed) 245 self._call_run_once(constraints, profile_only, 246 postprocess_profiled_run, args, dargs) 247 test_iteration_finish = _get_time() 248 time_elapsed = test_iteration_finish - test_start 249 logging.info('Test finished after %d iterations', 250 timed_counter) 251 logging.info('Time elapsed: %d s', time_elapsed) 252 else: 253 if iterations is None: 254 iterations = 1 255 logging.info('Test started. Number of iterations: %d', iterations) 256 for self.iteration in xrange(1, iterations+1): 257 logging.info('Executing iteration %d of %d', self.iteration, 258 iterations) 259 self._call_run_once(constraints, profile_only, 260 postprocess_profiled_run, args, dargs) 261 logging.info('Test finished after %d iterations.', iterations) 262 263 if not profile_only: 264 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 265 266 # Do any postprocessing, normally extracting performance keyvals, etc 267 self.postprocess() 268 self.process_failed_constraints() 269 270 271 def run_once_profiling(self, postprocess_profiled_run, *args, **dargs): 272 profilers = self.job.profilers 273 # Do a profiling run if necessary 274 if profilers.present(): 275 self.drop_caches_between_iterations() 276 profilers.start(self) 277 print 'Profilers present. Profiling run started' 278 try: 279 self.run_once(*args, **dargs) 280 281 # Priority to the run_once() argument over the attribute. 282 postprocess_attribute = getattr(self, 283 'postprocess_profiled_run', 284 False) 285 286 if (postprocess_profiled_run or 287 (postprocess_profiled_run is None and 288 postprocess_attribute)): 289 self.postprocess_iteration() 290 291 finally: 292 profilers.stop(self) 293 profilers.report(self) 294 295 296 def postprocess(self): 297 pass 298 299 300 def postprocess_iteration(self): 301 pass 302 303 304 def cleanup(self): 305 pass 306 307 308 def _setup_test_logging_handler(self): 309 """ 310 Adds a file handler during test execution, which will give test writers 311 the ability to obtain test results on the test results dir just by 312 making logging calls. 313 """ 314 result_filename = os.path.join(self.resultsdir, 315 '%s.log' % self.tagged_testname) 316 self.test_handler = logging.FileHandler(filename=result_filename, 317 mode='w') 318 fmt_str = '[%(asctime)s - %(module)-15s - %(levelname)-8s] %(message)s' 319 self.test_formatter = logging.Formatter(fmt_str) 320 self.test_handler.setFormatter(self.test_formatter) 321 self.logger = logging.getLogger() 322 self.logger.addHandler(self.test_handler) 323 324 325 def _exec(self, args, dargs): 326 327 self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout')) 328 self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr')) 329 self._setup_test_logging_handler() 330 try: 331 if self.network_destabilizing: 332 self.job.disable_warnings("NETWORK") 333 334 # write out the test attributes into a keyval 335 dargs = dargs.copy() 336 run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup) 337 keyvals = dargs.pop('test_attributes', {}).copy() 338 keyvals['version'] = self.version 339 for i, arg in enumerate(args): 340 keyvals['param-%d' % i] = repr(arg) 341 for name, arg in dargs.iteritems(): 342 keyvals['param-%s' % name] = repr(arg) 343 self.write_test_keyval(keyvals) 344 345 _validate_args(args, dargs, self.initialize, self.setup, 346 self.execute, self.cleanup) 347 348 try: 349 # Initialize: 350 _cherry_pick_call(self.initialize, *args, **dargs) 351 352 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 353 try: 354 fcntl.flock(lockfile, fcntl.LOCK_EX) 355 # Setup: (compile and install the test, if needed) 356 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 357 utils.update_version(self.srcdir, self.preserve_srcdir, 358 self.version, self.setup, 359 *p_args, **p_dargs) 360 finally: 361 fcntl.flock(lockfile, fcntl.LOCK_UN) 362 lockfile.close() 363 364 # Execute: 365 os.chdir(self.outputdir) 366 367 # call self.warmup cherry picking the arguments it accepts and 368 # translate exceptions if needed 369 _call_test_function(_cherry_pick_call, self.warmup, 370 *args, **dargs) 371 372 if hasattr(self, 'run_once'): 373 p_args, p_dargs = _cherry_pick_args(self.run_once, 374 args, dargs) 375 # pull in any non-* and non-** args from self.execute 376 for param in _get_nonstar_args(self.execute): 377 if param in dargs: 378 p_dargs[param] = dargs[param] 379 else: 380 p_args, p_dargs = _cherry_pick_args(self.execute, 381 args, dargs) 382 383 _call_test_function(self.execute, *p_args, **p_dargs) 384 except Exception: 385 # Save the exception while we run our cleanup() before 386 # reraising it. 387 exc_info = sys.exc_info() 388 try: 389 try: 390 if run_cleanup: 391 _cherry_pick_call(self.cleanup, *args, **dargs) 392 except Exception: 393 print 'Ignoring exception during cleanup() phase:' 394 traceback.print_exc() 395 print 'Now raising the earlier %s error' % exc_info[0] 396 finally: 397 self.job.stderr.restore() 398 self.job.stdout.restore() 399 try: 400 raise exc_info[0], exc_info[1], exc_info[2] 401 finally: 402 # http://docs.python.org/library/sys.html#sys.exc_info 403 # Be nice and prevent a circular reference. 404 del exc_info 405 else: 406 try: 407 if run_cleanup: 408 _cherry_pick_call(self.cleanup, *args, **dargs) 409 finally: 410 self.logger.removeHandler(self.test_handler) 411 self.job.stderr.restore() 412 self.job.stdout.restore() 413 except error.AutotestError: 414 if self.network_destabilizing: 415 self.job.enable_warnings("NETWORK") 416 # Pass already-categorized errors on up. 417 raise 418 except Exception, e: 419 if self.network_destabilizing: 420 self.job.enable_warnings("NETWORK") 421 # Anything else is an ERROR in our own code, not execute(). 422 raise error.UnhandledTestError(e) 423 else: 424 if self.network_destabilizing: 425 self.job.enable_warnings("NETWORK") 426 427 428def _get_nonstar_args(func): 429 """Extract all the (normal) function parameter names. 430 431 Given a function, returns a tuple of parameter names, specifically 432 excluding the * and ** parameters, if the function accepts them. 433 434 @param func: A callable that we want to chose arguments for. 435 436 @return: A tuple of parameters accepted by the function. 437 """ 438 return func.func_code.co_varnames[:func.func_code.co_argcount] 439 440 441def _cherry_pick_args(func, args, dargs): 442 """Sanitize positional and keyword arguments before calling a function. 443 444 Given a callable (func), an argument tuple and a dictionary of keyword 445 arguments, pick only those arguments which the function is prepared to 446 accept and return a new argument tuple and keyword argument dictionary. 447 448 Args: 449 func: A callable that we want to choose arguments for. 450 args: A tuple of positional arguments to consider passing to func. 451 dargs: A dictionary of keyword arguments to consider passing to func. 452 Returns: 453 A tuple of: (args tuple, keyword arguments dictionary) 454 """ 455 # Cherry pick args: 456 if func.func_code.co_flags & 0x04: 457 # func accepts *args, so return the entire args. 458 p_args = args 459 else: 460 p_args = () 461 462 # Cherry pick dargs: 463 if func.func_code.co_flags & 0x08: 464 # func accepts **dargs, so return the entire dargs. 465 p_dargs = dargs 466 else: 467 # Only return the keyword arguments that func accepts. 468 p_dargs = {} 469 for param in _get_nonstar_args(func): 470 if param in dargs: 471 p_dargs[param] = dargs[param] 472 473 return p_args, p_dargs 474 475 476def _cherry_pick_call(func, *args, **dargs): 477 """Cherry picks arguments from args/dargs based on what "func" accepts 478 and calls the function with the picked arguments.""" 479 p_args, p_dargs = _cherry_pick_args(func, args, dargs) 480 return func(*p_args, **p_dargs) 481 482 483def _validate_args(args, dargs, *funcs): 484 """Verify that arguments are appropriate for at least one callable. 485 486 Given a list of callables as additional parameters, verify that 487 the proposed keyword arguments in dargs will each be accepted by at least 488 one of the callables. 489 490 NOTE: args is currently not supported and must be empty. 491 492 Args: 493 args: A tuple of proposed positional arguments. 494 dargs: A dictionary of proposed keyword arguments. 495 *funcs: Callables to be searched for acceptance of args and dargs. 496 Raises: 497 error.AutotestError: if an arg won't be accepted by any of *funcs. 498 """ 499 all_co_flags = 0 500 all_varnames = () 501 for func in funcs: 502 all_co_flags |= func.func_code.co_flags 503 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 504 505 # Check if given args belongs to at least one of the methods below. 506 if len(args) > 0: 507 # Current implementation doesn't allow the use of args. 508 raise error.TestError('Unnamed arguments not accepted. Please ' 509 'call job.run_test with named args only') 510 511 # Check if given dargs belongs to at least one of the methods below. 512 if len(dargs) > 0: 513 if not all_co_flags & 0x08: 514 # no func accepts *dargs, so: 515 for param in dargs: 516 if not param in all_varnames: 517 raise error.AutotestError('Unknown parameter: %s' % param) 518 519 520def _installtest(job, url): 521 (group, name) = job.pkgmgr.get_package_name(url, 'test') 522 523 # Bail if the test is already installed 524 group_dir = os.path.join(job.testdir, "download", group) 525 if os.path.exists(os.path.join(group_dir, name)): 526 return (group, name) 527 528 # If the group directory is missing create it and add 529 # an empty __init__.py so that sub-directories are 530 # considered for import. 531 if not os.path.exists(group_dir): 532 os.mkdir(group_dir) 533 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 534 f.close() 535 536 print name + ": installing test url=" + url 537 tarball = os.path.basename(url) 538 tarball_path = os.path.join(group_dir, tarball) 539 test_dir = os.path.join(group_dir, name) 540 job.pkgmgr.fetch_pkg(tarball, tarball_path, 541 repo_url = os.path.dirname(url)) 542 543 # Create the directory for the test 544 if not os.path.exists(test_dir): 545 os.mkdir(os.path.join(group_dir, name)) 546 547 job.pkgmgr.untar_pkg(tarball_path, test_dir) 548 549 os.remove(tarball_path) 550 551 # For this 'sub-object' to be importable via the name 552 # 'group.name' we need to provide an __init__.py, 553 # so link the main entry point to this. 554 os.symlink(name + '.py', os.path.join(group_dir, name, 555 '__init__.py')) 556 557 # The test is now installed. 558 return (group, name) 559 560 561def _call_test_function(func, *args, **dargs): 562 """Calls a test function and translates exceptions so that errors 563 inside test code are considered test failures.""" 564 try: 565 return func(*args, **dargs) 566 except error.AutotestError: 567 # Pass already-categorized errors on up as is. 568 raise 569 except Exception, e: 570 # Other exceptions must be treated as a FAIL when 571 # raised during the test functions 572 raise error.UnhandledTestFail(e) 573 574 575def runtest(job, url, tag, args, dargs, 576 local_namespace={}, global_namespace={}, 577 before_test_hook=None, after_test_hook=None, 578 before_iteration_hook=None, after_iteration_hook=None): 579 local_namespace = local_namespace.copy() 580 global_namespace = global_namespace.copy() 581 582 # if this is not a plain test name then download and install the 583 # specified test 584 if url.endswith('.tar.bz2'): 585 (group, testname) = _installtest(job, url) 586 bindir = os.path.join(job.testdir, 'download', group, testname) 587 site_bindir = None 588 else: 589 # if the test is local, it can be found in either testdir 590 # or site_testdir. tests in site_testdir override tests 591 # defined in testdir 592 (group, testname) = ('', url) 593 bindir = os.path.join(job.testdir, group, testname) 594 if hasattr(job, 'site_testdir'): 595 site_bindir = os.path.join(job.site_testdir, 596 group, testname) 597 else: 598 site_bindir = None 599 600 # The job object here can be that of a server side job or a client 601 # side job. 'install_pkg' method won't be present for server side 602 # jobs, so do the fetch only if that method is present in the job 603 # obj. 604 if hasattr(job, 'install_pkg'): 605 try: 606 job.install_pkg(testname, 'test', bindir) 607 except packages.PackageInstallError, e: 608 # continue as a fall back mechanism and see if the test code 609 # already exists on the machine 610 pass 611 612 outputdir = os.path.join(job.resultdir, testname) 613 if tag: 614 outputdir += '.' + tag 615 616 # if we can find the test in site_bindir, use this version 617 if site_bindir and os.path.exists(site_bindir): 618 bindir = site_bindir 619 testdir = job.site_testdir 620 elif os.path.exists(bindir): 621 testdir = job.testdir 622 else: 623 raise error.TestError(testname + ': test does not exist') 624 625 local_namespace['job'] = job 626 local_namespace['bindir'] = bindir 627 local_namespace['outputdir'] = outputdir 628 629 if group: 630 sys.path.insert(0, os.path.join(testdir, 'download')) 631 group += '.' 632 else: 633 sys.path.insert(0, os.path.join(testdir, testname)) 634 635 try: 636 exec ("import %s%s" % (group, testname), 637 local_namespace, global_namespace) 638 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 639 (group, testname, testname), 640 local_namespace, global_namespace) 641 finally: 642 sys.path.pop(0) 643 644 pwd = os.getcwd() 645 os.chdir(outputdir) 646 647 try: 648 mytest = global_namespace['mytest'] 649 if before_test_hook: 650 before_test_hook(mytest) 651 652 # we use the register iteration hooks methods to register the passed 653 # in hooks 654 if before_iteration_hook: 655 mytest.register_before_iteration_hook(before_iteration_hook) 656 if after_iteration_hook: 657 mytest.register_after_iteration_hook(after_iteration_hook) 658 mytest._exec(args, dargs) 659 finally: 660 os.chdir(pwd) 661 if after_test_hook: 662 after_test_hook(mytest) 663 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 664