test.py revision 5e703a286f9f112c419a27402b232e78bfd7bdc8
1# Shell class for a test, inherited by all individual tests 2# 3# Methods: 4# __init__ initialise 5# initialize run once for each job 6# setup run once for each new version of the test installed 7# run run the test (wrapped by job.run_test()) 8# 9# Data: 10# job backreference to the job this test instance is part of 11# outputdir eg. results/<job>/<testname.tag> 12# resultsdir eg. results/<job>/<testname.tag>/results 13# profdir eg. results/<job>/<testname.tag>/profiling 14# debugdir eg. results/<job>/<testname.tag>/debug 15# bindir eg. tests/<test> 16# src eg. tests/<test>/src 17# tmpdir eg. tmp/<tempname>_<testname.tag> 18 19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback 20import warnings, logging 21 22from autotest_lib.client.common_lib import error, packages 23from autotest_lib.client.bin import utils 24 25 26class base_test: 27 preserve_srcdir = False 28 network_destabilizing = False 29 30 def __init__(self, job, bindir, outputdir): 31 self.job = job 32 self.pkgmgr = job.pkgmgr 33 self.autodir = job.autodir 34 35 self.outputdir = outputdir 36 self.tagged_testname = os.path.basename(self.outputdir) 37 self.resultsdir = os.path.join(self.outputdir, 'results') 38 os.mkdir(self.resultsdir) 39 self.profdir = os.path.join(self.outputdir, 'profiling') 40 os.mkdir(self.profdir) 41 self.debugdir = os.path.join(self.outputdir, 'debug') 42 os.mkdir(self.debugdir) 43 self.bindir = bindir 44 if hasattr(job, 'libdir'): 45 self.libdir = job.libdir 46 self.srcdir = os.path.join(self.bindir, 'src') 47 self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname, 48 dir=job.tmpdir) 49 self._keyvals = [] 50 self._new_keyval = False 51 self.failed_constraints = [] 52 self.iteration = 0 53 self.before_iteration_hooks = [] 54 self.after_iteration_hooks = [] 55 56 57 def assert_(self, expr, msg='Assertion failed.'): 58 if not expr: 59 raise error.TestError(msg) 60 61 62 def write_test_keyval(self, attr_dict): 63 utils.write_keyval(self.outputdir, attr_dict) 64 65 66 @staticmethod 67 def _append_type_to_keys(dictionary, typename): 68 new_dict = {} 69 for key, value in dictionary.iteritems(): 70 new_key = "%s{%s}" % (key, typename) 71 new_dict[new_key] = value 72 return new_dict 73 74 75 def write_perf_keyval(self, perf_dict): 76 self.write_iteration_keyval({}, perf_dict) 77 78 79 def write_attr_keyval(self, attr_dict): 80 self.write_iteration_keyval(attr_dict, {}) 81 82 83 def write_iteration_keyval(self, attr_dict, perf_dict): 84 # append the dictionaries before they have the {perf} and {attr} added 85 self._keyvals.append({'attr':attr_dict, 'perf':perf_dict}) 86 self._new_keyval = True 87 88 if attr_dict: 89 attr_dict = self._append_type_to_keys(attr_dict, "attr") 90 utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr") 91 92 if perf_dict: 93 perf_dict = self._append_type_to_keys(perf_dict, "perf") 94 utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf") 95 96 keyval_path = os.path.join(self.resultsdir, "keyval") 97 print >> open(keyval_path, "a"), "" 98 99 100 def analyze_perf_constraints(self, constraints): 101 if not self._new_keyval: 102 return 103 104 self._new_keyval = False 105 failures = [] 106 for constraint in constraints: 107 print "___________________ constraint = %s" % constraint 108 print "___________________ keyvals = %s" % self._keyvals[-1]['perf'] 109 try: 110 if not eval(constraint, self._keyvals[-1]['perf']): 111 failures.append('%s: constraint was not met' % constraint) 112 except: 113 failures.append('could not evaluate constraint: %s' 114 % constraint) 115 116 # keep track of the errors for each iteration 117 self.failed_constraints.append(failures) 118 119 120 def process_failed_constraints(self): 121 msg = '' 122 for i, failures in enumerate(self.failed_constraints): 123 if failures: 124 msg += 'iteration %d:%s ' % (i, ','.join(failures)) 125 126 if msg: 127 raise error.TestFail(msg) 128 129 130 def register_before_iteration_hook(self, iteration_hook): 131 """ 132 This is how we expect test writers to register a before_iteration_hook. 133 This adds the method to the list of hooks which are executed 134 before each iteration. 135 136 @param iteration_hook: Method to run before each iteration. A valid 137 hook accepts a single argument which is the 138 test object. 139 """ 140 self.before_iteration_hooks.append(iteration_hook) 141 142 143 def register_after_iteration_hook(self, iteration_hook): 144 """ 145 This is how we expect test writers to register an after_iteration_hook. 146 This adds the method to the list of hooks which are executed 147 after each iteration. 148 149 @param iteration_hook: Method to run after each iteration. A valid 150 hook accepts a single argument which is the 151 test object. 152 """ 153 self.after_iteration_hooks.append(iteration_hook) 154 155 156 def initialize(self): 157 pass 158 159 160 def setup(self): 161 pass 162 163 164 def warmup(self, *args, **dargs): 165 pass 166 167 168 def drop_caches_between_iterations(self): 169 if self.job.drop_caches_between_iterations: 170 print "Dropping caches between iterations" 171 utils.drop_caches() 172 173 174 def _call_run_once(self, constraints, profile_only, 175 postprocess_profiled_run, args, dargs): 176 self.drop_caches_between_iterations() 177 178 # execute iteration hooks 179 for hook in self.before_iteration_hooks: 180 hook(self) 181 182 if profile_only: 183 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 184 else: 185 self.run_once(*args, **dargs) 186 187 for hook in self.after_iteration_hooks: 188 hook(self) 189 190 self.postprocess_iteration() 191 self.analyze_perf_constraints(constraints) 192 193 194 def execute(self, iterations=None, test_length=None, profile_only=False, 195 _get_time=time.time, postprocess_profiled_run=None, 196 constraints=(), *args, **dargs): 197 """ 198 This is the basic execute method for the tests inherited from base_test. 199 If you want to implement a benchmark test, it's better to implement 200 the run_once function, to cope with the profiling infrastructure. For 201 other tests, you can just override the default implementation. 202 203 @param test_length: The minimum test length in seconds. We'll run the 204 run_once function for a number of times large enough to cover the 205 minimum test length. 206 207 @param iterations: A number of iterations that we'll run the run_once 208 function. This parameter is incompatible with test_length and will 209 be silently ignored if you specify both. 210 211 @param profile_only: If true run X iterations with profilers enabled. 212 Otherwise run X iterations and one with profiling if profiles are 213 enabled. 214 215 @param _get_time: [time.time] Used for unit test time injection. 216 217 @param postprocess_profiled_run: Run the postprocessing for the 218 profiled run. 219 """ 220 221 # For our special class of tests, the benchmarks, we don't want 222 # profilers to run during the test iterations. Let's reserve only 223 # the last iteration for profiling, if needed. So let's stop 224 # all profilers if they are present and active. 225 profilers = self.job.profilers 226 if profilers.active(): 227 profilers.stop(self) 228 # If the user called this test in an odd way (specified both iterations 229 # and test_length), let's warn them. 230 if iterations and test_length: 231 logging.info('Iterations parameter ignored (timed execution).') 232 if test_length: 233 test_start = _get_time() 234 time_elapsed = 0 235 timed_counter = 0 236 logging.info('Test started. Minimum test length: %d s', 237 test_length) 238 while time_elapsed < test_length: 239 timed_counter = timed_counter + 1 240 if time_elapsed == 0: 241 logging.info('Executing iteration %d', timed_counter) 242 elif time_elapsed > 0: 243 logging.info( 244 'Executing iteration %d, time_elapsed %d s', 245 timed_counter, time_elapsed) 246 self._call_run_once(constraints, profile_only, 247 postprocess_profiled_run, args, dargs) 248 test_iteration_finish = _get_time() 249 time_elapsed = test_iteration_finish - test_start 250 logging.info('Test finished after %d iterations', 251 timed_counter) 252 logging.info('Time elapsed: %d s', time_elapsed) 253 else: 254 if iterations is None: 255 iterations = 1 256 logging.info('Test started. Number of iterations: %d', iterations) 257 for self.iteration in xrange(1, iterations+1): 258 logging.info('Executing iteration %d of %d', self.iteration, 259 iterations) 260 self._call_run_once(constraints, profile_only, 261 postprocess_profiled_run, args, dargs) 262 logging.info('Test finished after %d iterations.', iterations) 263 264 if not profile_only: 265 self.iteration += 1 266 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) 267 268 # Do any postprocessing, normally extracting performance keyvals, etc 269 self.postprocess() 270 self.process_failed_constraints() 271 272 273 def run_once_profiling(self, postprocess_profiled_run, *args, **dargs): 274 profilers = self.job.profilers 275 # Do a profiling run if necessary 276 if profilers.present(): 277 self.drop_caches_between_iterations() 278 profilers.start(self) 279 print 'Profilers present. Profiling run started' 280 try: 281 self.run_once(*args, **dargs) 282 283 # Priority to the run_once() argument over the attribute. 284 postprocess_attribute = getattr(self, 285 'postprocess_profiled_run', 286 False) 287 288 if (postprocess_profiled_run or 289 (postprocess_profiled_run is None and 290 postprocess_attribute)): 291 self.postprocess_iteration() 292 293 finally: 294 profilers.stop(self) 295 profilers.report(self) 296 297 298 def postprocess(self): 299 pass 300 301 302 def postprocess_iteration(self): 303 pass 304 305 306 def cleanup(self): 307 pass 308 309 310 def _setup_test_logging_handler(self): 311 """ 312 Adds a file handler during test execution, which will give test writers 313 the ability to obtain test results on the test results dir just by 314 making logging calls. 315 """ 316 result_filename = os.path.join(self.resultsdir, 317 '%s.log' % self.tagged_testname) 318 self.test_handler = logging.FileHandler(filename=result_filename, 319 mode='w') 320 fmt_str = '[%(asctime)s - %(module)-15s - %(levelname)-8s] %(message)s' 321 self.test_formatter = logging.Formatter(fmt_str) 322 self.test_handler.setFormatter(self.test_formatter) 323 self.logger = logging.getLogger() 324 self.logger.addHandler(self.test_handler) 325 326 327 def _exec(self, args, dargs): 328 self.job.logging.tee_redirect_debug_dir(self.debugdir) 329 self._setup_test_logging_handler() 330 try: 331 if self.network_destabilizing: 332 self.job.disable_warnings("NETWORK") 333 334 # write out the test attributes into a keyval 335 dargs = dargs.copy() 336 run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup) 337 keyvals = dargs.pop('test_attributes', {}).copy() 338 keyvals['version'] = self.version 339 for i, arg in enumerate(args): 340 keyvals['param-%d' % i] = repr(arg) 341 for name, arg in dargs.iteritems(): 342 keyvals['param-%s' % name] = repr(arg) 343 self.write_test_keyval(keyvals) 344 345 _validate_args(args, dargs, self.initialize, self.setup, 346 self.execute, self.cleanup) 347 348 try: 349 # Initialize: 350 _cherry_pick_call(self.initialize, *args, **dargs) 351 352 lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') 353 try: 354 fcntl.flock(lockfile, fcntl.LOCK_EX) 355 # Setup: (compile and install the test, if needed) 356 p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs) 357 utils.update_version(self.srcdir, self.preserve_srcdir, 358 self.version, self.setup, 359 *p_args, **p_dargs) 360 finally: 361 fcntl.flock(lockfile, fcntl.LOCK_UN) 362 lockfile.close() 363 364 # Execute: 365 os.chdir(self.outputdir) 366 367 # call self.warmup cherry picking the arguments it accepts and 368 # translate exceptions if needed 369 _call_test_function(_cherry_pick_call, self.warmup, 370 *args, **dargs) 371 372 if hasattr(self, 'run_once'): 373 p_args, p_dargs = _cherry_pick_args(self.run_once, 374 args, dargs) 375 # pull in any non-* and non-** args from self.execute 376 for param in _get_nonstar_args(self.execute): 377 if param in dargs: 378 p_dargs[param] = dargs[param] 379 else: 380 p_args, p_dargs = _cherry_pick_args(self.execute, 381 args, dargs) 382 383 _call_test_function(self.execute, *p_args, **p_dargs) 384 except Exception: 385 # Save the exception while we run our cleanup() before 386 # reraising it. 387 exc_info = sys.exc_info() 388 try: 389 try: 390 if run_cleanup: 391 _cherry_pick_call(self.cleanup, *args, **dargs) 392 except Exception: 393 print 'Ignoring exception during cleanup() phase:' 394 traceback.print_exc() 395 print 'Now raising the earlier %s error' % exc_info[0] 396 finally: 397 self.job.logging.restore() 398 try: 399 raise exc_info[0], exc_info[1], exc_info[2] 400 finally: 401 # http://docs.python.org/library/sys.html#sys.exc_info 402 # Be nice and prevent a circular reference. 403 del exc_info 404 else: 405 try: 406 if run_cleanup: 407 _cherry_pick_call(self.cleanup, *args, **dargs) 408 finally: 409 self.logger.removeHandler(self.test_handler) 410 self.job.logging.restore() 411 except error.AutotestError: 412 if self.network_destabilizing: 413 self.job.enable_warnings("NETWORK") 414 # Pass already-categorized errors on up. 415 raise 416 except Exception, e: 417 if self.network_destabilizing: 418 self.job.enable_warnings("NETWORK") 419 # Anything else is an ERROR in our own code, not execute(). 420 raise error.UnhandledTestError(e) 421 else: 422 if self.network_destabilizing: 423 self.job.enable_warnings("NETWORK") 424 425 426def _get_nonstar_args(func): 427 """Extract all the (normal) function parameter names. 428 429 Given a function, returns a tuple of parameter names, specifically 430 excluding the * and ** parameters, if the function accepts them. 431 432 @param func: A callable that we want to chose arguments for. 433 434 @return: A tuple of parameters accepted by the function. 435 """ 436 return func.func_code.co_varnames[:func.func_code.co_argcount] 437 438 439def _cherry_pick_args(func, args, dargs): 440 """Sanitize positional and keyword arguments before calling a function. 441 442 Given a callable (func), an argument tuple and a dictionary of keyword 443 arguments, pick only those arguments which the function is prepared to 444 accept and return a new argument tuple and keyword argument dictionary. 445 446 Args: 447 func: A callable that we want to choose arguments for. 448 args: A tuple of positional arguments to consider passing to func. 449 dargs: A dictionary of keyword arguments to consider passing to func. 450 Returns: 451 A tuple of: (args tuple, keyword arguments dictionary) 452 """ 453 # Cherry pick args: 454 if func.func_code.co_flags & 0x04: 455 # func accepts *args, so return the entire args. 456 p_args = args 457 else: 458 p_args = () 459 460 # Cherry pick dargs: 461 if func.func_code.co_flags & 0x08: 462 # func accepts **dargs, so return the entire dargs. 463 p_dargs = dargs 464 else: 465 # Only return the keyword arguments that func accepts. 466 p_dargs = {} 467 for param in _get_nonstar_args(func): 468 if param in dargs: 469 p_dargs[param] = dargs[param] 470 471 return p_args, p_dargs 472 473 474def _cherry_pick_call(func, *args, **dargs): 475 """Cherry picks arguments from args/dargs based on what "func" accepts 476 and calls the function with the picked arguments.""" 477 p_args, p_dargs = _cherry_pick_args(func, args, dargs) 478 return func(*p_args, **p_dargs) 479 480 481def _validate_args(args, dargs, *funcs): 482 """Verify that arguments are appropriate for at least one callable. 483 484 Given a list of callables as additional parameters, verify that 485 the proposed keyword arguments in dargs will each be accepted by at least 486 one of the callables. 487 488 NOTE: args is currently not supported and must be empty. 489 490 Args: 491 args: A tuple of proposed positional arguments. 492 dargs: A dictionary of proposed keyword arguments. 493 *funcs: Callables to be searched for acceptance of args and dargs. 494 Raises: 495 error.AutotestError: if an arg won't be accepted by any of *funcs. 496 """ 497 all_co_flags = 0 498 all_varnames = () 499 for func in funcs: 500 all_co_flags |= func.func_code.co_flags 501 all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] 502 503 # Check if given args belongs to at least one of the methods below. 504 if len(args) > 0: 505 # Current implementation doesn't allow the use of args. 506 raise error.TestError('Unnamed arguments not accepted. Please ' 507 'call job.run_test with named args only') 508 509 # Check if given dargs belongs to at least one of the methods below. 510 if len(dargs) > 0: 511 if not all_co_flags & 0x08: 512 # no func accepts *dargs, so: 513 for param in dargs: 514 if not param in all_varnames: 515 raise error.AutotestError('Unknown parameter: %s' % param) 516 517 518def _installtest(job, url): 519 (group, name) = job.pkgmgr.get_package_name(url, 'test') 520 521 # Bail if the test is already installed 522 group_dir = os.path.join(job.testdir, "download", group) 523 if os.path.exists(os.path.join(group_dir, name)): 524 return (group, name) 525 526 # If the group directory is missing create it and add 527 # an empty __init__.py so that sub-directories are 528 # considered for import. 529 if not os.path.exists(group_dir): 530 os.mkdir(group_dir) 531 f = file(os.path.join(group_dir, '__init__.py'), 'w+') 532 f.close() 533 534 print name + ": installing test url=" + url 535 tarball = os.path.basename(url) 536 tarball_path = os.path.join(group_dir, tarball) 537 test_dir = os.path.join(group_dir, name) 538 job.pkgmgr.fetch_pkg(tarball, tarball_path, 539 repo_url = os.path.dirname(url)) 540 541 # Create the directory for the test 542 if not os.path.exists(test_dir): 543 os.mkdir(os.path.join(group_dir, name)) 544 545 job.pkgmgr.untar_pkg(tarball_path, test_dir) 546 547 os.remove(tarball_path) 548 549 # For this 'sub-object' to be importable via the name 550 # 'group.name' we need to provide an __init__.py, 551 # so link the main entry point to this. 552 os.symlink(name + '.py', os.path.join(group_dir, name, 553 '__init__.py')) 554 555 # The test is now installed. 556 return (group, name) 557 558 559def _call_test_function(func, *args, **dargs): 560 """Calls a test function and translates exceptions so that errors 561 inside test code are considered test failures.""" 562 try: 563 return func(*args, **dargs) 564 except error.AutotestError: 565 # Pass already-categorized errors on up as is. 566 raise 567 except Exception, e: 568 # Other exceptions must be treated as a FAIL when 569 # raised during the test functions 570 raise error.UnhandledTestFail(e) 571 572 573def runtest(job, url, tag, args, dargs, 574 local_namespace={}, global_namespace={}, 575 before_test_hook=None, after_test_hook=None, 576 before_iteration_hook=None, after_iteration_hook=None): 577 local_namespace = local_namespace.copy() 578 global_namespace = global_namespace.copy() 579 580 # if this is not a plain test name then download and install the 581 # specified test 582 if url.endswith('.tar.bz2'): 583 (group, testname) = _installtest(job, url) 584 bindir = os.path.join(job.testdir, 'download', group, testname) 585 site_bindir = None 586 else: 587 # if the test is local, it can be found in either testdir 588 # or site_testdir. tests in site_testdir override tests 589 # defined in testdir 590 (group, testname) = ('', url) 591 bindir = os.path.join(job.testdir, group, testname) 592 if hasattr(job, 'site_testdir'): 593 site_bindir = os.path.join(job.site_testdir, 594 group, testname) 595 else: 596 site_bindir = None 597 598 # The job object here can be that of a server side job or a client 599 # side job. 'install_pkg' method won't be present for server side 600 # jobs, so do the fetch only if that method is present in the job 601 # obj. 602 if hasattr(job, 'install_pkg'): 603 try: 604 job.install_pkg(testname, 'test', bindir) 605 except packages.PackageInstallError, e: 606 # continue as a fall back mechanism and see if the test code 607 # already exists on the machine 608 pass 609 610 outputdir = os.path.join(job.resultdir, testname) 611 if tag: 612 outputdir += '.' + tag 613 614 # if we can find the test in site_bindir, use this version 615 if site_bindir and os.path.exists(site_bindir): 616 bindir = site_bindir 617 testdir = job.site_testdir 618 elif os.path.exists(bindir): 619 testdir = job.testdir 620 else: 621 raise error.TestError(testname + ': test does not exist') 622 623 local_namespace['job'] = job 624 local_namespace['bindir'] = bindir 625 local_namespace['outputdir'] = outputdir 626 627 if group: 628 sys.path.insert(0, os.path.join(testdir, 'download')) 629 group += '.' 630 else: 631 sys.path.insert(0, os.path.join(testdir, testname)) 632 633 try: 634 exec ("import %s%s" % (group, testname), 635 local_namespace, global_namespace) 636 exec ("mytest = %s%s.%s(job, bindir, outputdir)" % 637 (group, testname, testname), 638 local_namespace, global_namespace) 639 finally: 640 sys.path.pop(0) 641 642 pwd = os.getcwd() 643 os.chdir(outputdir) 644 645 try: 646 mytest = global_namespace['mytest'] 647 if before_test_hook: 648 before_test_hook(mytest) 649 650 # we use the register iteration hooks methods to register the passed 651 # in hooks 652 if before_iteration_hook: 653 mytest.register_before_iteration_hook(before_iteration_hook) 654 if after_iteration_hook: 655 mytest.register_after_iteration_hook(after_iteration_hook) 656 mytest._exec(args, dargs) 657 finally: 658 os.chdir(pwd) 659 if after_test_hook: 660 after_test_hook(mytest) 661 shutil.rmtree(mytest.tmpdir, ignore_errors=True) 662