test.py revision 8b2954a7693afd2927dde072326832094ad8565f
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19import fcntl, getpass, os, re, sys, shutil, tarfile, tempfile, time, traceback
20import warnings, logging, glob, resource
21
22from autotest_lib.client.common_lib import error
23from autotest_lib.client.bin import utils
24
25
26class base_test(object):
27    preserve_srcdir = False
28    network_destabilizing = False
29
30    def __init__(self, job, bindir, outputdir):
31        self.job = job
32        self.pkgmgr = job.pkgmgr
33        self.autodir = job.autodir
34        self.outputdir = outputdir
35        self.tagged_testname = os.path.basename(self.outputdir)
36        self.resultsdir = os.path.join(self.outputdir, 'results')
37        os.mkdir(self.resultsdir)
38        self.profdir = os.path.join(self.outputdir, 'profiling')
39        os.mkdir(self.profdir)
40        self.debugdir = os.path.join(self.outputdir, 'debug')
41        os.mkdir(self.debugdir)
42        # TODO(ericli): figure out how autotest crash handler work with cros
43        # crash handler, we should restore it in near term.
44#        if getpass.getuser() == 'root':
45#            self.configure_crash_handler()
46#        else:
47        self.crash_handling_enabled = False
48        self.bindir = bindir
49        self.srcdir = os.path.join(self.bindir, 'src')
50        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
51                                       dir=job.tmpdir)
52        self._keyvals = []
53        self._new_keyval = False
54        self.failed_constraints = []
55        self.iteration = 0
56        self.before_iteration_hooks = []
57        self.after_iteration_hooks = []
58
59
60    def configure_crash_handler(self):
61        pass
62
63
64    def crash_handler_report(self):
65        pass
66
67
68    def assert_(self, expr, msg='Assertion failed.'):
69        if not expr:
70            raise error.TestError(msg)
71
72
73    def write_test_keyval(self, attr_dict):
74        utils.write_keyval(self.outputdir, attr_dict)
75
76
77    @staticmethod
78    def _append_type_to_keys(dictionary, typename):
79        new_dict = {}
80        for key, value in dictionary.iteritems():
81            new_key = "%s{%s}" % (key, typename)
82            new_dict[new_key] = value
83        return new_dict
84
85
86    def write_perf_keyval(self, perf_dict):
87        self.write_iteration_keyval({}, perf_dict)
88
89
90    def write_attr_keyval(self, attr_dict):
91        self.write_iteration_keyval(attr_dict, {})
92
93
94    def write_iteration_keyval(self, attr_dict, perf_dict):
95        # append the dictionaries before they have the {perf} and {attr} added
96        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
97        self._new_keyval = True
98
99        if attr_dict:
100            attr_dict = self._append_type_to_keys(attr_dict, "attr")
101            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
102
103        if perf_dict:
104            perf_dict = self._append_type_to_keys(perf_dict, "perf")
105            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
106
107        keyval_path = os.path.join(self.resultsdir, "keyval")
108        print >> open(keyval_path, "a"), ""
109
110
111    def analyze_perf_constraints(self, constraints):
112        if not self._new_keyval:
113            return
114
115        # create a dict from the keyvals suitable as an environment for eval
116        keyval_env = self._keyvals[-1]['perf'].copy()
117        keyval_env['__builtins__'] = None
118        self._new_keyval = False
119        failures = []
120
121        # evaluate each constraint using the current keyvals
122        for constraint in constraints:
123            logging.info('___________________ constraint = %s', constraint)
124            logging.info('___________________ keyvals = %s', keyval_env)
125
126            try:
127                if not eval(constraint, keyval_env):
128                    failures.append('%s: constraint was not met' % constraint)
129            except:
130                failures.append('could not evaluate constraint: %s'
131                                % constraint)
132
133        # keep track of the errors for each iteration
134        self.failed_constraints.append(failures)
135
136
137    def process_failed_constraints(self):
138        msg = ''
139        for i, failures in enumerate(self.failed_constraints):
140            if failures:
141                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
142
143        if msg:
144            raise error.TestFail(msg)
145
146
147    def register_before_iteration_hook(self, iteration_hook):
148        """
149        This is how we expect test writers to register a before_iteration_hook.
150        This adds the method to the list of hooks which are executed
151        before each iteration.
152
153        @param iteration_hook: Method to run before each iteration. A valid
154                               hook accepts a single argument which is the
155                               test object.
156        """
157        self.before_iteration_hooks.append(iteration_hook)
158
159
160    def register_after_iteration_hook(self, iteration_hook):
161        """
162        This is how we expect test writers to register an after_iteration_hook.
163        This adds the method to the list of hooks which are executed
164        after each iteration.
165
166        @param iteration_hook: Method to run after each iteration. A valid
167                               hook accepts a single argument which is the
168                               test object.
169        """
170        self.after_iteration_hooks.append(iteration_hook)
171
172
173    def initialize(self):
174        pass
175
176
177    def setup(self):
178        pass
179
180
181    def warmup(self, *args, **dargs):
182        pass
183
184
185    def drop_caches_between_iterations(self):
186        if self.job.drop_caches_between_iterations:
187            print "Dropping caches between iterations"
188            utils.drop_caches()
189
190
191    def _call_run_once(self, constraints, profile_only,
192                       postprocess_profiled_run, args, dargs):
193        self.drop_caches_between_iterations()
194
195        # execute iteration hooks
196        for hook in self.before_iteration_hooks:
197            hook(self)
198
199        if profile_only:
200            if not self.job.profilers.present():
201                self.job.record('WARN', None, None, 'No profilers have been '
202                                'added but profile_only is set - nothing '
203                                'will be run')
204            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
205        else:
206            self.before_run_once()
207            self.run_once(*args, **dargs)
208            self.after_run_once()
209
210        for hook in self.after_iteration_hooks:
211            hook(self)
212
213        self.postprocess_iteration()
214        self.analyze_perf_constraints(constraints)
215
216
217    def execute(self, iterations=None, test_length=None, profile_only=None,
218                _get_time=time.time, postprocess_profiled_run=None,
219                constraints=(), *args, **dargs):
220        """
221        This is the basic execute method for the tests inherited from base_test.
222        If you want to implement a benchmark test, it's better to implement
223        the run_once function, to cope with the profiling infrastructure. For
224        other tests, you can just override the default implementation.
225
226        @param test_length: The minimum test length in seconds. We'll run the
227            run_once function for a number of times large enough to cover the
228            minimum test length.
229
230        @param iterations: A number of iterations that we'll run the run_once
231            function. This parameter is incompatible with test_length and will
232            be silently ignored if you specify both.
233
234        @param profile_only: If true run X iterations with profilers enabled.
235            If false run X iterations and one with profiling if profiles are
236            enabled. If None, default to the value of job.default_profile_only.
237
238        @param _get_time: [time.time] Used for unit test time injection.
239
240        @param postprocess_profiled_run: Run the postprocessing for the
241            profiled run.
242        """
243
244        # For our special class of tests, the benchmarks, we don't want
245        # profilers to run during the test iterations. Let's reserve only
246        # the last iteration for profiling, if needed. So let's stop
247        # all profilers if they are present and active.
248        profilers = self.job.profilers
249        if profilers.active():
250            profilers.stop(self)
251        if profile_only is None:
252            profile_only = self.job.default_profile_only
253        # If the user called this test in an odd way (specified both iterations
254        # and test_length), let's warn them.
255        if iterations and test_length:
256            logging.info('Iterations parameter ignored (timed execution).')
257        if test_length:
258            test_start = _get_time()
259            time_elapsed = 0
260            timed_counter = 0
261            logging.info('Test started. Minimum test length: %d s',
262                               test_length)
263            while time_elapsed < test_length:
264                timed_counter = timed_counter + 1
265                if time_elapsed == 0:
266                    logging.info('Executing iteration %d', timed_counter)
267                elif time_elapsed > 0:
268                    logging.info(
269                            'Executing iteration %d, time_elapsed %d s',
270                            timed_counter, time_elapsed)
271                self._call_run_once(constraints, profile_only,
272                                    postprocess_profiled_run, args, dargs)
273                test_iteration_finish = _get_time()
274                time_elapsed = test_iteration_finish - test_start
275            logging.info('Test finished after %d iterations',
276                               timed_counter)
277            logging.info('Time elapsed: %d s', time_elapsed)
278        else:
279            if iterations is None:
280                iterations = 1
281            logging.info('Test started. Number of iterations: %d', iterations)
282            for self.iteration in xrange(1, iterations+1):
283                logging.info('Executing iteration %d of %d', self.iteration,
284                                                             iterations)
285                self._call_run_once(constraints, profile_only,
286                                    postprocess_profiled_run, args, dargs)
287            logging.info('Test finished after %d iterations.', iterations)
288
289        if not profile_only:
290            self.iteration += 1
291            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
292
293        # Do any postprocessing, normally extracting performance keyvals, etc
294        self.postprocess()
295        self.process_failed_constraints()
296
297
298    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
299        profilers = self.job.profilers
300        # Do a profiling run if necessary
301        if profilers.present():
302            self.drop_caches_between_iterations()
303            profilers.before_start(self)
304
305            self.before_run_once()
306            profilers.start(self)
307            print 'Profilers present. Profiling run started'
308
309            try:
310                self.run_once(*args, **dargs)
311
312                # Priority to the run_once() argument over the attribute.
313                postprocess_attribute = getattr(self,
314                                                'postprocess_profiled_run',
315                                                False)
316
317                if (postprocess_profiled_run or
318                    (postprocess_profiled_run is None and
319                     postprocess_attribute)):
320                    self.postprocess_iteration()
321
322            finally:
323                profilers.stop(self)
324                profilers.report(self)
325
326            self.after_run_once()
327
328
329    def postprocess(self):
330        pass
331
332
333    def postprocess_iteration(self):
334        pass
335
336
337    def cleanup(self):
338        pass
339
340
341    def before_run_once(self):
342        """
343        Override in tests that need it, will be called before any run_once()
344        call including the profiling run (when it's called before starting
345        the profilers).
346        """
347        pass
348
349
350    def after_run_once(self):
351        """
352        Called after every run_once (including from a profiled run when it's
353        called after stopping the profilers).
354        """
355        pass
356
357
358    def _exec(self, args, dargs):
359        self.job.logging.tee_redirect_debug_dir(self.debugdir,
360                                                log_name=self.tagged_testname)
361        try:
362            if self.network_destabilizing:
363                self.job.disable_warnings("NETWORK")
364
365            # write out the test attributes into a keyval
366            dargs   = dargs.copy()
367            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
368            keyvals = dargs.pop('test_attributes', {}).copy()
369            keyvals['version'] = self.version
370            for i, arg in enumerate(args):
371                keyvals['param-%d' % i] = repr(arg)
372            for name, arg in dargs.iteritems():
373                keyvals['param-%s' % name] = repr(arg)
374            self.write_test_keyval(keyvals)
375
376            _validate_args(args, dargs, self.initialize, self.setup,
377                           self.execute, self.cleanup)
378
379            try:
380                # Initialize:
381                _cherry_pick_call(self.initialize, *args, **dargs)
382
383                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
384                try:
385                    fcntl.flock(lockfile, fcntl.LOCK_EX)
386                    # Setup: (compile and install the test, if needed)
387                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
388                    utils.update_version(self.srcdir, self.preserve_srcdir,
389                                         self.version, self.setup,
390                                         *p_args, **p_dargs)
391                finally:
392                    fcntl.flock(lockfile, fcntl.LOCK_UN)
393                    lockfile.close()
394
395                # Execute:
396                os.chdir(self.outputdir)
397
398                # call self.warmup cherry picking the arguments it accepts and
399                # translate exceptions if needed
400                _call_test_function(_cherry_pick_call, self.warmup,
401                                    *args, **dargs)
402
403                if hasattr(self, 'run_once'):
404                    p_args, p_dargs = _cherry_pick_args(self.run_once,
405                                                        args, dargs)
406                    # pull in any non-* and non-** args from self.execute
407                    for param in _get_nonstar_args(self.execute):
408                        if param in dargs:
409                            p_dargs[param] = dargs[param]
410                else:
411                    p_args, p_dargs = _cherry_pick_args(self.execute,
412                                                        args, dargs)
413
414                _call_test_function(self.execute, *p_args, **p_dargs)
415            except Exception:
416                try:
417                    logging.exception('Exception escaping from test:')
418                except:
419                    pass # don't let logging exceptions here interfere
420
421                # Save the exception while we run our cleanup() before
422                # reraising it.
423                exc_info = sys.exc_info()
424                try:
425                    try:
426                        if run_cleanup:
427                            _cherry_pick_call(self.cleanup, *args, **dargs)
428                    except Exception:
429                        print 'Ignoring exception during cleanup() phase:'
430                        traceback.print_exc()
431                        print 'Now raising the earlier %s error' % exc_info[0]
432                    self.crash_handler_report()
433                finally:
434                    self.job.logging.restore()
435                    try:
436                        raise exc_info[0], exc_info[1], exc_info[2]
437                    finally:
438                        # http://docs.python.org/library/sys.html#sys.exc_info
439                        # Be nice and prevent a circular reference.
440                        del exc_info
441            else:
442                try:
443                    if run_cleanup:
444                        _cherry_pick_call(self.cleanup, *args, **dargs)
445                    self.crash_handler_report()
446                finally:
447                    self.job.logging.restore()
448        except error.AutotestError:
449            if self.network_destabilizing:
450                self.job.enable_warnings("NETWORK")
451            # Pass already-categorized errors on up.
452            raise
453        except Exception, e:
454            if self.network_destabilizing:
455                self.job.enable_warnings("NETWORK")
456            # Anything else is an ERROR in our own code, not execute().
457            raise error.UnhandledTestError(e)
458        else:
459            if self.network_destabilizing:
460                self.job.enable_warnings("NETWORK")
461
462
463def _get_nonstar_args(func):
464    """Extract all the (normal) function parameter names.
465
466    Given a function, returns a tuple of parameter names, specifically
467    excluding the * and ** parameters, if the function accepts them.
468
469    @param func: A callable that we want to chose arguments for.
470
471    @return: A tuple of parameters accepted by the function.
472    """
473    return func.func_code.co_varnames[:func.func_code.co_argcount]
474
475
476def _cherry_pick_args(func, args, dargs):
477    """Sanitize positional and keyword arguments before calling a function.
478
479    Given a callable (func), an argument tuple and a dictionary of keyword
480    arguments, pick only those arguments which the function is prepared to
481    accept and return a new argument tuple and keyword argument dictionary.
482
483    Args:
484      func: A callable that we want to choose arguments for.
485      args: A tuple of positional arguments to consider passing to func.
486      dargs: A dictionary of keyword arguments to consider passing to func.
487    Returns:
488      A tuple of: (args tuple, keyword arguments dictionary)
489    """
490    # Cherry pick args:
491    if func.func_code.co_flags & 0x04:
492        # func accepts *args, so return the entire args.
493        p_args = args
494    else:
495        p_args = ()
496
497    # Cherry pick dargs:
498    if func.func_code.co_flags & 0x08:
499        # func accepts **dargs, so return the entire dargs.
500        p_dargs = dargs
501    else:
502        # Only return the keyword arguments that func accepts.
503        p_dargs = {}
504        for param in _get_nonstar_args(func):
505            if param in dargs:
506                p_dargs[param] = dargs[param]
507
508    return p_args, p_dargs
509
510
511def _cherry_pick_call(func, *args, **dargs):
512    """Cherry picks arguments from args/dargs based on what "func" accepts
513    and calls the function with the picked arguments."""
514    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
515    return func(*p_args, **p_dargs)
516
517
518def _validate_args(args, dargs, *funcs):
519    """Verify that arguments are appropriate for at least one callable.
520
521    Given a list of callables as additional parameters, verify that
522    the proposed keyword arguments in dargs will each be accepted by at least
523    one of the callables.
524
525    NOTE: args is currently not supported and must be empty.
526
527    Args:
528      args: A tuple of proposed positional arguments.
529      dargs: A dictionary of proposed keyword arguments.
530      *funcs: Callables to be searched for acceptance of args and dargs.
531    Raises:
532      error.AutotestError: if an arg won't be accepted by any of *funcs.
533    """
534    all_co_flags = 0
535    all_varnames = ()
536    for func in funcs:
537        all_co_flags |= func.func_code.co_flags
538        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
539
540    # Check if given args belongs to at least one of the methods below.
541    if len(args) > 0:
542        # Current implementation doesn't allow the use of args.
543        raise error.TestError('Unnamed arguments not accepted. Please '
544                              'call job.run_test with named args only')
545
546    # Check if given dargs belongs to at least one of the methods below.
547    if len(dargs) > 0:
548        if not all_co_flags & 0x08:
549            # no func accepts *dargs, so:
550            for param in dargs:
551                if not param in all_varnames:
552                    raise error.AutotestError('Unknown parameter: %s' % param)
553
554
555def _installtest(job, url):
556    (group, name) = job.pkgmgr.get_package_name(url, 'test')
557
558    # Bail if the test is already installed
559    group_dir = os.path.join(job.testdir, "download", group)
560    if os.path.exists(os.path.join(group_dir, name)):
561        return (group, name)
562
563    # If the group directory is missing create it and add
564    # an empty  __init__.py so that sub-directories are
565    # considered for import.
566    if not os.path.exists(group_dir):
567        os.makedirs(group_dir)
568        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
569        f.close()
570
571    print name + ": installing test url=" + url
572    tarball = os.path.basename(url)
573    tarball_path = os.path.join(group_dir, tarball)
574    test_dir = os.path.join(group_dir, name)
575    job.pkgmgr.fetch_pkg(tarball, tarball_path,
576                         repo_url = os.path.dirname(url))
577
578    # Create the directory for the test
579    if not os.path.exists(test_dir):
580        os.mkdir(os.path.join(group_dir, name))
581
582    job.pkgmgr.untar_pkg(tarball_path, test_dir)
583
584    os.remove(tarball_path)
585
586    # For this 'sub-object' to be importable via the name
587    # 'group.name' we need to provide an __init__.py,
588    # so link the main entry point to this.
589    os.symlink(name + '.py', os.path.join(group_dir, name,
590                            '__init__.py'))
591
592    # The test is now installed.
593    return (group, name)
594
595
596def _call_test_function(func, *args, **dargs):
597    """Calls a test function and translates exceptions so that errors
598    inside test code are considered test failures."""
599    try:
600        return func(*args, **dargs)
601    except error.AutotestError:
602        # Pass already-categorized errors on up as is.
603        raise
604    except Exception, e:
605        # Other exceptions must be treated as a FAIL when
606        # raised during the test functions
607        raise error.UnhandledTestFail(e)
608
609
610def runtest(job, url, tag, args, dargs,
611            local_namespace={}, global_namespace={},
612            before_test_hook=None, after_test_hook=None,
613            before_iteration_hook=None, after_iteration_hook=None):
614    local_namespace = local_namespace.copy()
615    global_namespace = global_namespace.copy()
616
617    # if this is not a plain test name then download and install the
618    # specified test
619    if url.endswith('.tar.bz2'):
620        (testgroup, testname) = _installtest(job, url)
621        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
622        importdir = os.path.join(job.testdir, 'download')
623        site_bindir = None
624        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
625        classname = '%s.%s' % (modulename, testname)
626        path = testname
627    else:
628        # If the test is local, it may be under either testdir or site_testdir.
629        # Tests in site_testdir override tests defined in testdir
630        testname = path = url
631        testgroup = ''
632        path = re.sub(':', '/', testname)
633        modulename = os.path.basename(path)
634        classname = '%s.%s' % (modulename, modulename)
635
636        # Try installing the test package
637        # The job object may be either a server side job or a client side job.
638        # 'install_pkg' method will be present only if it's a client side job.
639        if hasattr(job, 'install_pkg'):
640            try:
641                bindir = os.path.join(job.testdir, testname)
642                job.install_pkg(testname, 'test', bindir)
643            except error.PackageInstallError, e:
644                # continue as a fall back mechanism and see if the test code
645                # already exists on the machine
646                pass
647
648        bindir = testdir = None
649        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
650            if dir is not None and os.path.exists(os.path.join(dir, path)):
651                testdir = dir
652                importdir = bindir = os.path.join(dir, path)
653        if not bindir:
654            raise error.TestError(testname + ': test does not exist')
655
656    outputdir = os.path.join(job.resultdir, testname)
657    if tag:
658        outputdir += '.' + tag
659
660    local_namespace['job'] = job
661    local_namespace['bindir'] = bindir
662    local_namespace['outputdir'] = outputdir
663
664    sys.path.insert(0, importdir)
665    try:
666        exec ('import %s' % modulename, local_namespace, global_namespace)
667        exec ("mytest = %s(job, bindir, outputdir)" % classname,
668              local_namespace, global_namespace)
669    finally:
670        sys.path.pop(0)
671
672    pwd = os.getcwd()
673    os.chdir(outputdir)
674
675    try:
676        mytest = global_namespace['mytest']
677        if before_test_hook:
678            before_test_hook(mytest)
679
680        # we use the register iteration hooks methods to register the passed
681        # in hooks
682        if before_iteration_hook:
683            mytest.register_before_iteration_hook(before_iteration_hook)
684        if after_iteration_hook:
685            mytest.register_after_iteration_hook(after_iteration_hook)
686        mytest._exec(args, dargs)
687    finally:
688        os.chdir(pwd)
689        if after_test_hook:
690            after_test_hook(mytest)
691        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
692