test.py revision ee36bc777d44d641bb6664dcaf3f903936b0e3ee
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback
20import warnings, logging
21
22from autotest_lib.client.common_lib import error, packages
23from autotest_lib.client.bin import utils
24
25
26class base_test:
27    preserve_srcdir = False
28    network_destabilizing = False
29
30    def __init__(self, job, bindir, outputdir):
31        self.job = job
32        self.pkgmgr = job.pkgmgr
33        self.autodir = job.autodir
34
35        self.outputdir = outputdir
36        self.tagged_testname = os.path.basename(self.outputdir)
37        self.resultsdir = os.path.join(self.outputdir, 'results')
38        os.mkdir(self.resultsdir)
39        self.profdir = os.path.join(self.outputdir, 'profiling')
40        os.mkdir(self.profdir)
41        self.debugdir = os.path.join(self.outputdir, 'debug')
42        os.mkdir(self.debugdir)
43        self.bindir = bindir
44        if hasattr(job, 'libdir'):
45            self.libdir = job.libdir
46        self.srcdir = os.path.join(self.bindir, 'src')
47        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
48                                       dir=job.tmpdir)
49        self._keyvals = []
50        self._new_keyval = False
51        self.failed_constraints = []
52        self.iteration = 0
53        self.before_iteration_hooks = []
54        self.after_iteration_hooks = []
55
56
57    def assert_(self, expr, msg='Assertion failed.'):
58        if not expr:
59            raise error.TestError(msg)
60
61
62    def write_test_keyval(self, attr_dict):
63        utils.write_keyval(self.outputdir, attr_dict)
64
65
66    @staticmethod
67    def _append_type_to_keys(dictionary, typename):
68        new_dict = {}
69        for key, value in dictionary.iteritems():
70            new_key = "%s{%s}" % (key, typename)
71            new_dict[new_key] = value
72        return new_dict
73
74
75    def write_perf_keyval(self, perf_dict):
76        self.write_iteration_keyval({}, perf_dict)
77
78
79    def write_attr_keyval(self, attr_dict):
80        self.write_iteration_keyval(attr_dict, {})
81
82
83    def write_iteration_keyval(self, attr_dict, perf_dict):
84        # append the dictionaries before they have the {perf} and {attr} added
85        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
86        self._new_keyval = True
87
88        if attr_dict:
89            attr_dict = self._append_type_to_keys(attr_dict, "attr")
90            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
91
92        if perf_dict:
93            perf_dict = self._append_type_to_keys(perf_dict, "perf")
94            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
95
96        keyval_path = os.path.join(self.resultsdir, "keyval")
97        print >> open(keyval_path, "a"), ""
98
99
100    def analyze_perf_constraints(self, constraints):
101        if not self._new_keyval:
102            return
103
104        self._new_keyval = False
105        failures = []
106        for constraint in constraints:
107            print "___________________ constraint = %s" % constraint
108            print "___________________ keyvals = %s" % self._keyvals[-1]['perf']
109            try:
110                if not eval(constraint, self._keyvals[-1]['perf']):
111                    failures.append('%s: constraint was not met' % constraint)
112            except:
113                failures.append('could not evaluate constraint: %s'
114                                % constraint)
115
116        # keep track of the errors for each iteration
117        self.failed_constraints.append(failures)
118
119
120    def process_failed_constraints(self):
121        msg = ''
122        for i, failures in enumerate(self.failed_constraints):
123            if failures:
124                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
125
126        if msg:
127            raise error.TestFail(msg)
128
129
130    def register_before_iteration_hook(self, iteration_hook):
131        """
132        This is how we expect test writers to register a before_iteration_hook.
133        This adds the method to the list of hooks which are executed
134        before each iteration.
135
136        @param iteration_hook: Method to run before each iteration. A valid
137                               hook accepts a single argument which is the
138                               test object.
139        """
140        self.before_iteration_hooks.append(iteration_hook)
141
142
143    def register_after_iteration_hook(self, iteration_hook):
144        """
145        This is how we expect test writers to register an after_iteration_hook.
146        This adds the method to the list of hooks which are executed
147        after each iteration.
148
149        @param iteration_hook: Method to run after each iteration. A valid
150                               hook accepts a single argument which is the
151                               test object.
152        """
153        self.after_iteration_hooks.append(iteration_hook)
154
155
156    def initialize(self):
157        pass
158
159
160    def setup(self):
161        pass
162
163
164    def warmup(self, *args, **dargs):
165        pass
166
167
168    def drop_caches_between_iterations(self):
169        if self.job.drop_caches_between_iterations:
170            print "Dropping caches between iterations"
171            utils.drop_caches()
172
173
174    def _call_run_once(self, constraints, profile_only,
175                       postprocess_profiled_run, args, dargs):
176        self.drop_caches_between_iterations()
177
178        # execute iteration hooks
179        for hook in self.before_iteration_hooks:
180            hook(self)
181
182        if profile_only:
183            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
184        else:
185            self.run_once(*args, **dargs)
186
187        for hook in self.after_iteration_hooks:
188            hook(self)
189
190        self.postprocess_iteration()
191        self.analyze_perf_constraints(constraints)
192
193
194    def execute(self, iterations=None, test_length=None, profile_only=False,
195                _get_time=time.time, postprocess_profiled_run=None,
196                constraints=(), *args, **dargs):
197        """
198        This is the basic execute method for the tests inherited from base_test.
199        If you want to implement a benchmark test, it's better to implement
200        the run_once function, to cope with the profiling infrastructure. For
201        other tests, you can just override the default implementation.
202
203        @param test_length: The minimum test length in seconds. We'll run the
204            run_once function for a number of times large enough to cover the
205            minimum test length.
206
207        @param iterations: A number of iterations that we'll run the run_once
208            function. This parameter is incompatible with test_length and will
209            be silently ignored if you specify both.
210
211        @param profile_only: If true run X iterations with profilers enabled.
212            Otherwise run X iterations and one with profiling if profiles are
213            enabled.
214
215        @param _get_time: [time.time] Used for unit test time injection.
216
217        @param postprocess_profiled_run: Run the postprocessing for the
218            profiled run.
219        """
220
221        # For our special class of tests, the benchmarks, we don't want
222        # profilers to run during the test iterations. Let's reserve only
223        # the last iteration for profiling, if needed. So let's stop
224        # all profilers if they are present and active.
225        profilers = self.job.profilers
226        if profilers.active():
227            profilers.stop(self)
228        # If the user called this test in an odd way (specified both iterations
229        # and test_length), let's warn them.
230        if iterations and test_length:
231            logging.info('Iterations parameter ignored (timed execution).')
232        if test_length:
233            test_start = _get_time()
234            time_elapsed = 0
235            timed_counter = 0
236            logging.info('Test started. Minimum test length: %d s',
237                               test_length)
238            while time_elapsed < test_length:
239                timed_counter = timed_counter + 1
240                if time_elapsed == 0:
241                    logging.info('Executing iteration %d', timed_counter)
242                elif time_elapsed > 0:
243                    logging.info(
244                            'Executing iteration %d, time_elapsed %d s',
245                            timed_counter, time_elapsed)
246                self._call_run_once(constraints, profile_only,
247                                    postprocess_profiled_run, args, dargs)
248                test_iteration_finish = _get_time()
249                time_elapsed = test_iteration_finish - test_start
250            logging.info('Test finished after %d iterations',
251                               timed_counter)
252            logging.info('Time elapsed: %d s', time_elapsed)
253        else:
254            if iterations is None:
255                iterations = 1
256            logging.info('Test started. Number of iterations: %d', iterations)
257            for self.iteration in xrange(1, iterations+1):
258                logging.info('Executing iteration %d of %d', self.iteration,
259                                                             iterations)
260                self._call_run_once(constraints, profile_only,
261                                    postprocess_profiled_run, args, dargs)
262            logging.info('Test finished after %d iterations.', iterations)
263
264        if not profile_only:
265            self.iteration += 1
266            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
267
268        # Do any postprocessing, normally extracting performance keyvals, etc
269        self.postprocess()
270        self.process_failed_constraints()
271
272
273    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
274        profilers = self.job.profilers
275        # Do a profiling run if necessary
276        if profilers.present():
277            self.drop_caches_between_iterations()
278            profilers.start(self)
279            print 'Profilers present. Profiling run started'
280            try:
281                self.run_once(*args, **dargs)
282
283                # Priority to the run_once() argument over the attribute.
284                postprocess_attribute = getattr(self,
285                                                'postprocess_profiled_run',
286                                                False)
287
288                if (postprocess_profiled_run or
289                    (postprocess_profiled_run is None and
290                     postprocess_attribute)):
291                    self.postprocess_iteration()
292
293            finally:
294                profilers.stop(self)
295                profilers.report(self)
296
297
298    def postprocess(self):
299        pass
300
301
302    def postprocess_iteration(self):
303        pass
304
305
306    def cleanup(self):
307        pass
308
309
310    def _exec(self, args, dargs):
311        self.job.logging.tee_redirect_debug_dir(self.debugdir,
312                                                log_name=self.tagged_testname)
313        try:
314            if self.network_destabilizing:
315                self.job.disable_warnings("NETWORK")
316
317            # write out the test attributes into a keyval
318            dargs   = dargs.copy()
319            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
320            keyvals = dargs.pop('test_attributes', {}).copy()
321            keyvals['version'] = self.version
322            for i, arg in enumerate(args):
323                keyvals['param-%d' % i] = repr(arg)
324            for name, arg in dargs.iteritems():
325                keyvals['param-%s' % name] = repr(arg)
326            self.write_test_keyval(keyvals)
327
328            _validate_args(args, dargs, self.initialize, self.setup,
329                           self.execute, self.cleanup)
330
331            try:
332                # Initialize:
333                _cherry_pick_call(self.initialize, *args, **dargs)
334
335                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
336                try:
337                    fcntl.flock(lockfile, fcntl.LOCK_EX)
338                    # Setup: (compile and install the test, if needed)
339                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
340                    utils.update_version(self.srcdir, self.preserve_srcdir,
341                                         self.version, self.setup,
342                                         *p_args, **p_dargs)
343                finally:
344                    fcntl.flock(lockfile, fcntl.LOCK_UN)
345                    lockfile.close()
346
347                # Execute:
348                os.chdir(self.outputdir)
349
350                # call self.warmup cherry picking the arguments it accepts and
351                # translate exceptions if needed
352                _call_test_function(_cherry_pick_call, self.warmup,
353                                    *args, **dargs)
354
355                if hasattr(self, 'run_once'):
356                    p_args, p_dargs = _cherry_pick_args(self.run_once,
357                                                        args, dargs)
358                    # pull in any non-* and non-** args from self.execute
359                    for param in _get_nonstar_args(self.execute):
360                        if param in dargs:
361                            p_dargs[param] = dargs[param]
362                else:
363                    p_args, p_dargs = _cherry_pick_args(self.execute,
364                                                        args, dargs)
365
366                _call_test_function(self.execute, *p_args, **p_dargs)
367            except Exception:
368                # Save the exception while we run our cleanup() before
369                # reraising it.
370                exc_info = sys.exc_info()
371                try:
372                    try:
373                        if run_cleanup:
374                            _cherry_pick_call(self.cleanup, *args, **dargs)
375                    except Exception:
376                        print 'Ignoring exception during cleanup() phase:'
377                        traceback.print_exc()
378                        print 'Now raising the earlier %s error' % exc_info[0]
379                finally:
380                    self.job.logging.restore()
381                    try:
382                        raise exc_info[0], exc_info[1], exc_info[2]
383                    finally:
384                        # http://docs.python.org/library/sys.html#sys.exc_info
385                        # Be nice and prevent a circular reference.
386                        del exc_info
387            else:
388                try:
389                    if run_cleanup:
390                        _cherry_pick_call(self.cleanup, *args, **dargs)
391                finally:
392                    self.job.logging.restore()
393        except error.AutotestError:
394            if self.network_destabilizing:
395                self.job.enable_warnings("NETWORK")
396            # Pass already-categorized errors on up.
397            raise
398        except Exception, e:
399            if self.network_destabilizing:
400                self.job.enable_warnings("NETWORK")
401            # Anything else is an ERROR in our own code, not execute().
402            raise error.UnhandledTestError(e)
403        else:
404            if self.network_destabilizing:
405                self.job.enable_warnings("NETWORK")
406
407
408def _get_nonstar_args(func):
409    """Extract all the (normal) function parameter names.
410
411    Given a function, returns a tuple of parameter names, specifically
412    excluding the * and ** parameters, if the function accepts them.
413
414    @param func: A callable that we want to chose arguments for.
415
416    @return: A tuple of parameters accepted by the function.
417    """
418    return func.func_code.co_varnames[:func.func_code.co_argcount]
419
420
421def _cherry_pick_args(func, args, dargs):
422    """Sanitize positional and keyword arguments before calling a function.
423
424    Given a callable (func), an argument tuple and a dictionary of keyword
425    arguments, pick only those arguments which the function is prepared to
426    accept and return a new argument tuple and keyword argument dictionary.
427
428    Args:
429      func: A callable that we want to choose arguments for.
430      args: A tuple of positional arguments to consider passing to func.
431      dargs: A dictionary of keyword arguments to consider passing to func.
432    Returns:
433      A tuple of: (args tuple, keyword arguments dictionary)
434    """
435    # Cherry pick args:
436    if func.func_code.co_flags & 0x04:
437        # func accepts *args, so return the entire args.
438        p_args = args
439    else:
440        p_args = ()
441
442    # Cherry pick dargs:
443    if func.func_code.co_flags & 0x08:
444        # func accepts **dargs, so return the entire dargs.
445        p_dargs = dargs
446    else:
447        # Only return the keyword arguments that func accepts.
448        p_dargs = {}
449        for param in _get_nonstar_args(func):
450            if param in dargs:
451                p_dargs[param] = dargs[param]
452
453    return p_args, p_dargs
454
455
456def _cherry_pick_call(func, *args, **dargs):
457    """Cherry picks arguments from args/dargs based on what "func" accepts
458    and calls the function with the picked arguments."""
459    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
460    return func(*p_args, **p_dargs)
461
462
463def _validate_args(args, dargs, *funcs):
464    """Verify that arguments are appropriate for at least one callable.
465
466    Given a list of callables as additional parameters, verify that
467    the proposed keyword arguments in dargs will each be accepted by at least
468    one of the callables.
469
470    NOTE: args is currently not supported and must be empty.
471
472    Args:
473      args: A tuple of proposed positional arguments.
474      dargs: A dictionary of proposed keyword arguments.
475      *funcs: Callables to be searched for acceptance of args and dargs.
476    Raises:
477      error.AutotestError: if an arg won't be accepted by any of *funcs.
478    """
479    all_co_flags = 0
480    all_varnames = ()
481    for func in funcs:
482        all_co_flags |= func.func_code.co_flags
483        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
484
485    # Check if given args belongs to at least one of the methods below.
486    if len(args) > 0:
487        # Current implementation doesn't allow the use of args.
488        raise error.TestError('Unnamed arguments not accepted. Please '
489                              'call job.run_test with named args only')
490
491    # Check if given dargs belongs to at least one of the methods below.
492    if len(dargs) > 0:
493        if not all_co_flags & 0x08:
494            # no func accepts *dargs, so:
495            for param in dargs:
496                if not param in all_varnames:
497                    raise error.AutotestError('Unknown parameter: %s' % param)
498
499
500def _installtest(job, url):
501    (group, name) = job.pkgmgr.get_package_name(url, 'test')
502
503    # Bail if the test is already installed
504    group_dir = os.path.join(job.testdir, "download", group)
505    if os.path.exists(os.path.join(group_dir, name)):
506        return (group, name)
507
508    # If the group directory is missing create it and add
509    # an empty  __init__.py so that sub-directories are
510    # considered for import.
511    if not os.path.exists(group_dir):
512        os.mkdir(group_dir)
513        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
514        f.close()
515
516    print name + ": installing test url=" + url
517    tarball = os.path.basename(url)
518    tarball_path = os.path.join(group_dir, tarball)
519    test_dir = os.path.join(group_dir, name)
520    job.pkgmgr.fetch_pkg(tarball, tarball_path,
521                         repo_url = os.path.dirname(url))
522
523    # Create the directory for the test
524    if not os.path.exists(test_dir):
525        os.mkdir(os.path.join(group_dir, name))
526
527    job.pkgmgr.untar_pkg(tarball_path, test_dir)
528
529    os.remove(tarball_path)
530
531    # For this 'sub-object' to be importable via the name
532    # 'group.name' we need to provide an __init__.py,
533    # so link the main entry point to this.
534    os.symlink(name + '.py', os.path.join(group_dir, name,
535                            '__init__.py'))
536
537    # The test is now installed.
538    return (group, name)
539
540
541def _call_test_function(func, *args, **dargs):
542    """Calls a test function and translates exceptions so that errors
543    inside test code are considered test failures."""
544    try:
545        return func(*args, **dargs)
546    except error.AutotestError:
547        # Pass already-categorized errors on up as is.
548        raise
549    except Exception, e:
550        # Other exceptions must be treated as a FAIL when
551        # raised during the test functions
552        raise error.UnhandledTestFail(e)
553
554
555def runtest(job, url, tag, args, dargs,
556            local_namespace={}, global_namespace={},
557            before_test_hook=None, after_test_hook=None,
558            before_iteration_hook=None, after_iteration_hook=None):
559    local_namespace = local_namespace.copy()
560    global_namespace = global_namespace.copy()
561
562    # if this is not a plain test name then download and install the
563    # specified test
564    if url.endswith('.tar.bz2'):
565        (group, testname) = _installtest(job, url)
566        bindir = os.path.join(job.testdir, 'download', group, testname)
567        site_bindir = None
568    else:
569        # if the test is local, it can be found in either testdir
570        # or site_testdir. tests in site_testdir override tests
571        # defined in testdir
572        (group, testname) = ('', url)
573        bindir = os.path.join(job.testdir, group, testname)
574        if hasattr(job, 'site_testdir'):
575            site_bindir = os.path.join(job.site_testdir,
576                                       group, testname)
577        else:
578            site_bindir = None
579
580        # The job object here can be that of a server side job or a client
581        # side job. 'install_pkg' method won't be present for server side
582        # jobs, so do the fetch only if that method is present in the job
583        # obj.
584        if hasattr(job, 'install_pkg'):
585            try:
586                job.install_pkg(testname, 'test', bindir)
587            except packages.PackageInstallError, e:
588                # continue as a fall back mechanism and see if the test code
589                # already exists on the machine
590                pass
591
592    outputdir = os.path.join(job.resultdir, testname)
593    if tag:
594        outputdir += '.' + tag
595
596    # if we can find the test in site_bindir, use this version
597    if site_bindir and os.path.exists(site_bindir):
598        bindir = site_bindir
599        testdir = job.site_testdir
600    elif os.path.exists(bindir):
601        testdir = job.testdir
602    else:
603        raise error.TestError(testname + ': test does not exist')
604
605    local_namespace['job'] = job
606    local_namespace['bindir'] = bindir
607    local_namespace['outputdir'] = outputdir
608
609    if group:
610        sys.path.insert(0, os.path.join(testdir, 'download'))
611        group += '.'
612    else:
613        sys.path.insert(0, os.path.join(testdir, testname))
614
615    try:
616        exec ("import %s%s" % (group, testname),
617              local_namespace, global_namespace)
618        exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
619              (group, testname, testname),
620              local_namespace, global_namespace)
621    finally:
622        sys.path.pop(0)
623
624    pwd = os.getcwd()
625    os.chdir(outputdir)
626
627    try:
628        mytest = global_namespace['mytest']
629        if before_test_hook:
630            before_test_hook(mytest)
631
632        # we use the register iteration hooks methods to register the passed
633        # in hooks
634        if before_iteration_hook:
635            mytest.register_before_iteration_hook(before_iteration_hook)
636        if after_iteration_hook:
637            mytest.register_after_iteration_hook(after_iteration_hook)
638        mytest._exec(args, dargs)
639    finally:
640        os.chdir(pwd)
641        if after_test_hook:
642            after_test_hook(mytest)
643        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
644