test.py revision 5e703a286f9f112c419a27402b232e78bfd7bdc8
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback
20import warnings, logging
21
22from autotest_lib.client.common_lib import error, packages
23from autotest_lib.client.bin import utils
24
25
26class base_test:
27    preserve_srcdir = False
28    network_destabilizing = False
29
30    def __init__(self, job, bindir, outputdir):
31        self.job = job
32        self.pkgmgr = job.pkgmgr
33        self.autodir = job.autodir
34
35        self.outputdir = outputdir
36        self.tagged_testname = os.path.basename(self.outputdir)
37        self.resultsdir = os.path.join(self.outputdir, 'results')
38        os.mkdir(self.resultsdir)
39        self.profdir = os.path.join(self.outputdir, 'profiling')
40        os.mkdir(self.profdir)
41        self.debugdir = os.path.join(self.outputdir, 'debug')
42        os.mkdir(self.debugdir)
43        self.bindir = bindir
44        if hasattr(job, 'libdir'):
45            self.libdir = job.libdir
46        self.srcdir = os.path.join(self.bindir, 'src')
47        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
48                                       dir=job.tmpdir)
49        self._keyvals = []
50        self._new_keyval = False
51        self.failed_constraints = []
52        self.iteration = 0
53        self.before_iteration_hooks = []
54        self.after_iteration_hooks = []
55
56
57    def assert_(self, expr, msg='Assertion failed.'):
58        if not expr:
59            raise error.TestError(msg)
60
61
62    def write_test_keyval(self, attr_dict):
63        utils.write_keyval(self.outputdir, attr_dict)
64
65
66    @staticmethod
67    def _append_type_to_keys(dictionary, typename):
68        new_dict = {}
69        for key, value in dictionary.iteritems():
70            new_key = "%s{%s}" % (key, typename)
71            new_dict[new_key] = value
72        return new_dict
73
74
75    def write_perf_keyval(self, perf_dict):
76        self.write_iteration_keyval({}, perf_dict)
77
78
79    def write_attr_keyval(self, attr_dict):
80        self.write_iteration_keyval(attr_dict, {})
81
82
83    def write_iteration_keyval(self, attr_dict, perf_dict):
84        # append the dictionaries before they have the {perf} and {attr} added
85        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
86        self._new_keyval = True
87
88        if attr_dict:
89            attr_dict = self._append_type_to_keys(attr_dict, "attr")
90            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
91
92        if perf_dict:
93            perf_dict = self._append_type_to_keys(perf_dict, "perf")
94            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
95
96        keyval_path = os.path.join(self.resultsdir, "keyval")
97        print >> open(keyval_path, "a"), ""
98
99
100    def analyze_perf_constraints(self, constraints):
101        if not self._new_keyval:
102            return
103
104        self._new_keyval = False
105        failures = []
106        for constraint in constraints:
107            print "___________________ constraint = %s" % constraint
108            print "___________________ keyvals = %s" % self._keyvals[-1]['perf']
109            try:
110                if not eval(constraint, self._keyvals[-1]['perf']):
111                    failures.append('%s: constraint was not met' % constraint)
112            except:
113                failures.append('could not evaluate constraint: %s'
114                                % constraint)
115
116        # keep track of the errors for each iteration
117        self.failed_constraints.append(failures)
118
119
120    def process_failed_constraints(self):
121        msg = ''
122        for i, failures in enumerate(self.failed_constraints):
123            if failures:
124                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
125
126        if msg:
127            raise error.TestFail(msg)
128
129
130    def register_before_iteration_hook(self, iteration_hook):
131        """
132        This is how we expect test writers to register a before_iteration_hook.
133        This adds the method to the list of hooks which are executed
134        before each iteration.
135
136        @param iteration_hook: Method to run before each iteration. A valid
137                               hook accepts a single argument which is the
138                               test object.
139        """
140        self.before_iteration_hooks.append(iteration_hook)
141
142
143    def register_after_iteration_hook(self, iteration_hook):
144        """
145        This is how we expect test writers to register an after_iteration_hook.
146        This adds the method to the list of hooks which are executed
147        after each iteration.
148
149        @param iteration_hook: Method to run after each iteration. A valid
150                               hook accepts a single argument which is the
151                               test object.
152        """
153        self.after_iteration_hooks.append(iteration_hook)
154
155
156    def initialize(self):
157        pass
158
159
160    def setup(self):
161        pass
162
163
164    def warmup(self, *args, **dargs):
165        pass
166
167
168    def drop_caches_between_iterations(self):
169        if self.job.drop_caches_between_iterations:
170            print "Dropping caches between iterations"
171            utils.drop_caches()
172
173
174    def _call_run_once(self, constraints, profile_only,
175                       postprocess_profiled_run, args, dargs):
176        self.drop_caches_between_iterations()
177
178        # execute iteration hooks
179        for hook in self.before_iteration_hooks:
180            hook(self)
181
182        if profile_only:
183            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
184        else:
185            self.run_once(*args, **dargs)
186
187        for hook in self.after_iteration_hooks:
188            hook(self)
189
190        self.postprocess_iteration()
191        self.analyze_perf_constraints(constraints)
192
193
194    def execute(self, iterations=None, test_length=None, profile_only=False,
195                _get_time=time.time, postprocess_profiled_run=None,
196                constraints=(), *args, **dargs):
197        """
198        This is the basic execute method for the tests inherited from base_test.
199        If you want to implement a benchmark test, it's better to implement
200        the run_once function, to cope with the profiling infrastructure. For
201        other tests, you can just override the default implementation.
202
203        @param test_length: The minimum test length in seconds. We'll run the
204            run_once function for a number of times large enough to cover the
205            minimum test length.
206
207        @param iterations: A number of iterations that we'll run the run_once
208            function. This parameter is incompatible with test_length and will
209            be silently ignored if you specify both.
210
211        @param profile_only: If true run X iterations with profilers enabled.
212            Otherwise run X iterations and one with profiling if profiles are
213            enabled.
214
215        @param _get_time: [time.time] Used for unit test time injection.
216
217        @param postprocess_profiled_run: Run the postprocessing for the
218            profiled run.
219        """
220
221        # For our special class of tests, the benchmarks, we don't want
222        # profilers to run during the test iterations. Let's reserve only
223        # the last iteration for profiling, if needed. So let's stop
224        # all profilers if they are present and active.
225        profilers = self.job.profilers
226        if profilers.active():
227            profilers.stop(self)
228        # If the user called this test in an odd way (specified both iterations
229        # and test_length), let's warn them.
230        if iterations and test_length:
231            logging.info('Iterations parameter ignored (timed execution).')
232        if test_length:
233            test_start = _get_time()
234            time_elapsed = 0
235            timed_counter = 0
236            logging.info('Test started. Minimum test length: %d s',
237                               test_length)
238            while time_elapsed < test_length:
239                timed_counter = timed_counter + 1
240                if time_elapsed == 0:
241                    logging.info('Executing iteration %d', timed_counter)
242                elif time_elapsed > 0:
243                    logging.info(
244                            'Executing iteration %d, time_elapsed %d s',
245                            timed_counter, time_elapsed)
246                self._call_run_once(constraints, profile_only,
247                                    postprocess_profiled_run, args, dargs)
248                test_iteration_finish = _get_time()
249                time_elapsed = test_iteration_finish - test_start
250            logging.info('Test finished after %d iterations',
251                               timed_counter)
252            logging.info('Time elapsed: %d s', time_elapsed)
253        else:
254            if iterations is None:
255                iterations = 1
256            logging.info('Test started. Number of iterations: %d', iterations)
257            for self.iteration in xrange(1, iterations+1):
258                logging.info('Executing iteration %d of %d', self.iteration,
259                                                             iterations)
260                self._call_run_once(constraints, profile_only,
261                                    postprocess_profiled_run, args, dargs)
262            logging.info('Test finished after %d iterations.', iterations)
263
264        if not profile_only:
265            self.iteration += 1
266            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
267
268        # Do any postprocessing, normally extracting performance keyvals, etc
269        self.postprocess()
270        self.process_failed_constraints()
271
272
273    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
274        profilers = self.job.profilers
275        # Do a profiling run if necessary
276        if profilers.present():
277            self.drop_caches_between_iterations()
278            profilers.start(self)
279            print 'Profilers present. Profiling run started'
280            try:
281                self.run_once(*args, **dargs)
282
283                # Priority to the run_once() argument over the attribute.
284                postprocess_attribute = getattr(self,
285                                                'postprocess_profiled_run',
286                                                False)
287
288                if (postprocess_profiled_run or
289                    (postprocess_profiled_run is None and
290                     postprocess_attribute)):
291                    self.postprocess_iteration()
292
293            finally:
294                profilers.stop(self)
295                profilers.report(self)
296
297
298    def postprocess(self):
299        pass
300
301
302    def postprocess_iteration(self):
303        pass
304
305
306    def cleanup(self):
307        pass
308
309
310    def _setup_test_logging_handler(self):
311        """
312        Adds a file handler during test execution, which will give test writers
313        the ability to obtain test results on the test results dir just by
314        making logging calls.
315        """
316        result_filename = os.path.join(self.resultsdir,
317                                       '%s.log' % self.tagged_testname)
318        self.test_handler = logging.FileHandler(filename=result_filename,
319                                                mode='w')
320        fmt_str = '[%(asctime)s - %(module)-15s - %(levelname)-8s] %(message)s'
321        self.test_formatter = logging.Formatter(fmt_str)
322        self.test_handler.setFormatter(self.test_formatter)
323        self.logger = logging.getLogger()
324        self.logger.addHandler(self.test_handler)
325
326
327    def _exec(self, args, dargs):
328        self.job.logging.tee_redirect_debug_dir(self.debugdir)
329        self._setup_test_logging_handler()
330        try:
331            if self.network_destabilizing:
332                self.job.disable_warnings("NETWORK")
333
334            # write out the test attributes into a keyval
335            dargs   = dargs.copy()
336            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
337            keyvals = dargs.pop('test_attributes', {}).copy()
338            keyvals['version'] = self.version
339            for i, arg in enumerate(args):
340                keyvals['param-%d' % i] = repr(arg)
341            for name, arg in dargs.iteritems():
342                keyvals['param-%s' % name] = repr(arg)
343            self.write_test_keyval(keyvals)
344
345            _validate_args(args, dargs, self.initialize, self.setup,
346                           self.execute, self.cleanup)
347
348            try:
349                # Initialize:
350                _cherry_pick_call(self.initialize, *args, **dargs)
351
352                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
353                try:
354                    fcntl.flock(lockfile, fcntl.LOCK_EX)
355                    # Setup: (compile and install the test, if needed)
356                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
357                    utils.update_version(self.srcdir, self.preserve_srcdir,
358                                         self.version, self.setup,
359                                         *p_args, **p_dargs)
360                finally:
361                    fcntl.flock(lockfile, fcntl.LOCK_UN)
362                    lockfile.close()
363
364                # Execute:
365                os.chdir(self.outputdir)
366
367                # call self.warmup cherry picking the arguments it accepts and
368                # translate exceptions if needed
369                _call_test_function(_cherry_pick_call, self.warmup,
370                                    *args, **dargs)
371
372                if hasattr(self, 'run_once'):
373                    p_args, p_dargs = _cherry_pick_args(self.run_once,
374                                                        args, dargs)
375                    # pull in any non-* and non-** args from self.execute
376                    for param in _get_nonstar_args(self.execute):
377                        if param in dargs:
378                            p_dargs[param] = dargs[param]
379                else:
380                    p_args, p_dargs = _cherry_pick_args(self.execute,
381                                                        args, dargs)
382
383                _call_test_function(self.execute, *p_args, **p_dargs)
384            except Exception:
385                # Save the exception while we run our cleanup() before
386                # reraising it.
387                exc_info = sys.exc_info()
388                try:
389                    try:
390                        if run_cleanup:
391                            _cherry_pick_call(self.cleanup, *args, **dargs)
392                    except Exception:
393                        print 'Ignoring exception during cleanup() phase:'
394                        traceback.print_exc()
395                        print 'Now raising the earlier %s error' % exc_info[0]
396                finally:
397                    self.job.logging.restore()
398                    try:
399                        raise exc_info[0], exc_info[1], exc_info[2]
400                    finally:
401                        # http://docs.python.org/library/sys.html#sys.exc_info
402                        # Be nice and prevent a circular reference.
403                        del exc_info
404            else:
405                try:
406                    if run_cleanup:
407                        _cherry_pick_call(self.cleanup, *args, **dargs)
408                finally:
409                    self.logger.removeHandler(self.test_handler)
410                    self.job.logging.restore()
411        except error.AutotestError:
412            if self.network_destabilizing:
413                self.job.enable_warnings("NETWORK")
414            # Pass already-categorized errors on up.
415            raise
416        except Exception, e:
417            if self.network_destabilizing:
418                self.job.enable_warnings("NETWORK")
419            # Anything else is an ERROR in our own code, not execute().
420            raise error.UnhandledTestError(e)
421        else:
422            if self.network_destabilizing:
423                self.job.enable_warnings("NETWORK")
424
425
426def _get_nonstar_args(func):
427    """Extract all the (normal) function parameter names.
428
429    Given a function, returns a tuple of parameter names, specifically
430    excluding the * and ** parameters, if the function accepts them.
431
432    @param func: A callable that we want to chose arguments for.
433
434    @return: A tuple of parameters accepted by the function.
435    """
436    return func.func_code.co_varnames[:func.func_code.co_argcount]
437
438
439def _cherry_pick_args(func, args, dargs):
440    """Sanitize positional and keyword arguments before calling a function.
441
442    Given a callable (func), an argument tuple and a dictionary of keyword
443    arguments, pick only those arguments which the function is prepared to
444    accept and return a new argument tuple and keyword argument dictionary.
445
446    Args:
447      func: A callable that we want to choose arguments for.
448      args: A tuple of positional arguments to consider passing to func.
449      dargs: A dictionary of keyword arguments to consider passing to func.
450    Returns:
451      A tuple of: (args tuple, keyword arguments dictionary)
452    """
453    # Cherry pick args:
454    if func.func_code.co_flags & 0x04:
455        # func accepts *args, so return the entire args.
456        p_args = args
457    else:
458        p_args = ()
459
460    # Cherry pick dargs:
461    if func.func_code.co_flags & 0x08:
462        # func accepts **dargs, so return the entire dargs.
463        p_dargs = dargs
464    else:
465        # Only return the keyword arguments that func accepts.
466        p_dargs = {}
467        for param in _get_nonstar_args(func):
468            if param in dargs:
469                p_dargs[param] = dargs[param]
470
471    return p_args, p_dargs
472
473
474def _cherry_pick_call(func, *args, **dargs):
475    """Cherry picks arguments from args/dargs based on what "func" accepts
476    and calls the function with the picked arguments."""
477    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
478    return func(*p_args, **p_dargs)
479
480
481def _validate_args(args, dargs, *funcs):
482    """Verify that arguments are appropriate for at least one callable.
483
484    Given a list of callables as additional parameters, verify that
485    the proposed keyword arguments in dargs will each be accepted by at least
486    one of the callables.
487
488    NOTE: args is currently not supported and must be empty.
489
490    Args:
491      args: A tuple of proposed positional arguments.
492      dargs: A dictionary of proposed keyword arguments.
493      *funcs: Callables to be searched for acceptance of args and dargs.
494    Raises:
495      error.AutotestError: if an arg won't be accepted by any of *funcs.
496    """
497    all_co_flags = 0
498    all_varnames = ()
499    for func in funcs:
500        all_co_flags |= func.func_code.co_flags
501        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
502
503    # Check if given args belongs to at least one of the methods below.
504    if len(args) > 0:
505        # Current implementation doesn't allow the use of args.
506        raise error.TestError('Unnamed arguments not accepted. Please '
507                              'call job.run_test with named args only')
508
509    # Check if given dargs belongs to at least one of the methods below.
510    if len(dargs) > 0:
511        if not all_co_flags & 0x08:
512            # no func accepts *dargs, so:
513            for param in dargs:
514                if not param in all_varnames:
515                    raise error.AutotestError('Unknown parameter: %s' % param)
516
517
518def _installtest(job, url):
519    (group, name) = job.pkgmgr.get_package_name(url, 'test')
520
521    # Bail if the test is already installed
522    group_dir = os.path.join(job.testdir, "download", group)
523    if os.path.exists(os.path.join(group_dir, name)):
524        return (group, name)
525
526    # If the group directory is missing create it and add
527    # an empty  __init__.py so that sub-directories are
528    # considered for import.
529    if not os.path.exists(group_dir):
530        os.mkdir(group_dir)
531        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
532        f.close()
533
534    print name + ": installing test url=" + url
535    tarball = os.path.basename(url)
536    tarball_path = os.path.join(group_dir, tarball)
537    test_dir = os.path.join(group_dir, name)
538    job.pkgmgr.fetch_pkg(tarball, tarball_path,
539                         repo_url = os.path.dirname(url))
540
541    # Create the directory for the test
542    if not os.path.exists(test_dir):
543        os.mkdir(os.path.join(group_dir, name))
544
545    job.pkgmgr.untar_pkg(tarball_path, test_dir)
546
547    os.remove(tarball_path)
548
549    # For this 'sub-object' to be importable via the name
550    # 'group.name' we need to provide an __init__.py,
551    # so link the main entry point to this.
552    os.symlink(name + '.py', os.path.join(group_dir, name,
553                            '__init__.py'))
554
555    # The test is now installed.
556    return (group, name)
557
558
559def _call_test_function(func, *args, **dargs):
560    """Calls a test function and translates exceptions so that errors
561    inside test code are considered test failures."""
562    try:
563        return func(*args, **dargs)
564    except error.AutotestError:
565        # Pass already-categorized errors on up as is.
566        raise
567    except Exception, e:
568        # Other exceptions must be treated as a FAIL when
569        # raised during the test functions
570        raise error.UnhandledTestFail(e)
571
572
573def runtest(job, url, tag, args, dargs,
574            local_namespace={}, global_namespace={},
575            before_test_hook=None, after_test_hook=None,
576            before_iteration_hook=None, after_iteration_hook=None):
577    local_namespace = local_namespace.copy()
578    global_namespace = global_namespace.copy()
579
580    # if this is not a plain test name then download and install the
581    # specified test
582    if url.endswith('.tar.bz2'):
583        (group, testname) = _installtest(job, url)
584        bindir = os.path.join(job.testdir, 'download', group, testname)
585        site_bindir = None
586    else:
587        # if the test is local, it can be found in either testdir
588        # or site_testdir. tests in site_testdir override tests
589        # defined in testdir
590        (group, testname) = ('', url)
591        bindir = os.path.join(job.testdir, group, testname)
592        if hasattr(job, 'site_testdir'):
593            site_bindir = os.path.join(job.site_testdir,
594                                       group, testname)
595        else:
596            site_bindir = None
597
598        # The job object here can be that of a server side job or a client
599        # side job. 'install_pkg' method won't be present for server side
600        # jobs, so do the fetch only if that method is present in the job
601        # obj.
602        if hasattr(job, 'install_pkg'):
603            try:
604                job.install_pkg(testname, 'test', bindir)
605            except packages.PackageInstallError, e:
606                # continue as a fall back mechanism and see if the test code
607                # already exists on the machine
608                pass
609
610    outputdir = os.path.join(job.resultdir, testname)
611    if tag:
612        outputdir += '.' + tag
613
614    # if we can find the test in site_bindir, use this version
615    if site_bindir and os.path.exists(site_bindir):
616        bindir = site_bindir
617        testdir = job.site_testdir
618    elif os.path.exists(bindir):
619        testdir = job.testdir
620    else:
621        raise error.TestError(testname + ': test does not exist')
622
623    local_namespace['job'] = job
624    local_namespace['bindir'] = bindir
625    local_namespace['outputdir'] = outputdir
626
627    if group:
628        sys.path.insert(0, os.path.join(testdir, 'download'))
629        group += '.'
630    else:
631        sys.path.insert(0, os.path.join(testdir, testname))
632
633    try:
634        exec ("import %s%s" % (group, testname),
635              local_namespace, global_namespace)
636        exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
637              (group, testname, testname),
638              local_namespace, global_namespace)
639    finally:
640        sys.path.pop(0)
641
642    pwd = os.getcwd()
643    os.chdir(outputdir)
644
645    try:
646        mytest = global_namespace['mytest']
647        if before_test_hook:
648            before_test_hook(mytest)
649
650        # we use the register iteration hooks methods to register the passed
651        # in hooks
652        if before_iteration_hook:
653            mytest.register_before_iteration_hook(before_iteration_hook)
654        if after_iteration_hook:
655            mytest.register_after_iteration_hook(after_iteration_hook)
656        mytest._exec(args, dargs)
657    finally:
658        os.chdir(pwd)
659        if after_test_hook:
660            after_test_hook(mytest)
661        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
662