test.py revision a49c5cb549607a2049c45eed66200e48aae48c3b
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19import fcntl, os, re, sys, shutil, tarfile, tempfile, time, traceback
20import warnings
21
22from autotest_lib.client.common_lib import error, packages, debug
23from autotest_lib.client.bin import utils
24
25
26class base_test:
27    preserve_srcdir = False
28
29    def __init__(self, job, bindir, outputdir):
30        self.job = job
31        self.pkgmgr = job.pkgmgr
32        self.autodir = job.autodir
33
34        self.outputdir = outputdir
35        tagged_testname = os.path.basename(self.outputdir)
36        self.resultsdir = os.path.join(self.outputdir, 'results')
37        os.mkdir(self.resultsdir)
38        self.profdir = os.path.join(self.outputdir, 'profiling')
39        os.mkdir(self.profdir)
40        self.debugdir = os.path.join(self.outputdir, 'debug')
41        os.mkdir(self.debugdir)
42        self.bindir = bindir
43        if hasattr(job, 'libdir'):
44            self.libdir = job.libdir
45        self.srcdir = os.path.join(self.bindir, 'src')
46        self.tmpdir = tempfile.mkdtemp("_" + tagged_testname, dir=job.tmpdir)
47        self.test_log = debug.get_logger(module='tests')
48
49
50    def assert_(self, expr, msg='Assertion failed.'):
51        if not expr:
52            raise error.TestError(msg)
53
54
55    def write_test_keyval(self, attr_dict):
56        utils.write_keyval(self.outputdir, attr_dict)
57
58
59    @staticmethod
60    def _append_type_to_keys(dictionary, typename):
61        new_dict = {}
62        for key, value in dictionary.iteritems():
63            new_key = "%s{%s}" % (key, typename)
64            new_dict[new_key] = value
65        return new_dict
66
67
68    def write_perf_keyval(self, perf_dict):
69        self.write_iteration_keyval({}, perf_dict)
70
71
72    def write_attr_keyval(self, attr_dict):
73        self.write_iteration_keyval(attr_dict, {})
74
75
76    def write_iteration_keyval(self, attr_dict, perf_dict):
77        if attr_dict:
78            attr_dict = self._append_type_to_keys(attr_dict, "attr")
79            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
80
81        if perf_dict:
82            perf_dict = self._append_type_to_keys(perf_dict, "perf")
83            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
84
85        keyval_path = os.path.join(self.resultsdir, "keyval")
86        print >> open(keyval_path, "a"), ""
87
88
89    def initialize(self):
90        print 'No initialize phase defined'
91        pass
92
93
94    def setup(self):
95        pass
96
97
98    def warmup(self, *args, **dargs):
99        pass
100
101
102    def drop_caches_between_iterations(self):
103        if self.job.drop_caches_between_iterations:
104            print "Dropping caches between iterations"
105            utils.drop_caches()
106
107
108    def execute(self, iterations=None, test_length=None, profile_only=False,
109                _get_time=time.time, postprocess_profiled_run=None,
110                *args, **dargs):
111        """
112        This is the basic execute method for the tests inherited from base_test.
113        If you want to implement a benchmark test, it's better to implement
114        the run_once function, to cope with the profiling infrastructure. For
115        other tests, you can just override the default implementation.
116
117        @param test_length: The minimum test length in seconds. We'll run the
118            run_once function for a number of times large enough to cover the
119            minimum test length.
120
121        @param iterations: A number of iterations that we'll run the run_once
122            function. This parameter is incompatible with test_length and will
123            be silently ignored if you specify both.
124
125        @param profile_only: Do not run any test iterations before running
126            the test under the profiler.  This is equivalent to specifying
127            iterations=0 but is much easier to remember/read/comprehend when
128            making control files with job.run_test(profile_only=True) in it
129            rather than job.run_test(iterations=0).
130
131        @param _get_time: [time.time] Used for unit test time injection.
132
133        @param postprocess_profiled_run: Run the postprocessing for the
134            profiled run.
135        """
136
137        self.warmup(*args, **dargs)
138        # For our special class of tests, the benchmarks, we don't want
139        # profilers to run during the test iterations. Let's reserve only
140        # the last iteration for profiling, if needed. So let's stop
141        # all profilers if they are present and active.
142        profilers = self.job.profilers
143        if profilers.active():
144            profilers.stop(self)
145        # If the user called this test in an odd way (specified both iterations
146        # and test_length), let's warn them.
147        if iterations and test_length:
148            self.test_log.info(
149                    'Iterations parameter ignored (timed execution).')
150        if test_length:
151            test_start = _get_time()
152            time_elapsed = 0
153            timed_counter = 0
154            self.test_log.info('Test started. Minimum test length: %d s',
155                               test_length)
156            while time_elapsed < test_length:
157                timed_counter = timed_counter + 1
158                if time_elapsed == 0:
159                    self.test_log.info('Executing iteration %d', timed_counter)
160                elif time_elapsed > 0:
161                    self.test_log.info(
162                            'Executing iteration %d, time_elapsed %d s',
163                            timed_counter, time_elapsed)
164                self.drop_caches_between_iterations()
165                self.run_once(*args, **dargs)
166                test_iteration_finish = _get_time()
167                time_elapsed = test_iteration_finish - test_start
168            self.test_log.info('Test finished after %d iterations',
169                               timed_counter)
170            self.test_log.info('Time elapsed: %d s', time_elapsed)
171        else:
172            orig_iterations = iterations
173            if profile_only:
174                if iterations:
175                    self.test_log.info('Iterations parameter ignored '
176                                       '(profile_only=True).')
177                iterations = 0
178            elif iterations is None:
179                iterations = 1
180            if iterations:
181                self.test_log.info('Test started. '
182                                   'Number of iterations: %d', iterations)
183                for self.iteration in xrange(1, iterations+1):
184                    self.test_log.info('Executing iteration %d of %d',
185                                       self.iteration, iterations)
186                    self.drop_caches_between_iterations()
187                    self.run_once(*args, **dargs)
188                    self.postprocess_iteration()
189                self.test_log.info('Test finished after %d iterations.',
190                                   iterations)
191
192        self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
193
194        # Do any postprocessing, normally extracting performance keyvals, etc
195        self.postprocess()
196
197
198    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
199        profilers = self.job.profilers
200        # Do a profiling run if necessary
201        if profilers.present():
202            self.drop_caches_between_iterations()
203            profilers.start(self)
204            print 'Profilers present. Profiling run started'
205            try:
206                self.iteration = 0       # indicator this is a profiling run
207                self.run_once(*args, **dargs)
208
209                # Priority to the run_once() argument over the attribute.
210                postprocess_attribute = getattr(self,
211                                                'postprocess_profiled_run',
212                                                False)
213
214                if (postprocess_profiled_run or
215                    (postprocess_profiled_run is None and
216                     postprocess_attribute)):
217                    self.postprocess_iteration()
218
219            finally:
220                profilers.stop(self)
221                profilers.report(self)
222
223
224    def postprocess(self):
225        pass
226
227
228    def postprocess_iteration(self):
229        pass
230
231
232    def cleanup(self):
233        pass
234
235
236    def _run_cleanup(self, args, dargs):
237        """Call self.cleanup and convert exceptions as appropriate.
238
239        Args:
240          args: An argument tuple to pass to cleanup.
241          dargs: A dictionary of with potential keyword arguments for cleanup.
242        """
243        p_args, p_dargs = _cherry_pick_args(self.cleanup, args, dargs)
244        try:
245            self.cleanup(*p_args, **p_dargs)
246        except error.AutotestError:
247            raise
248        except Exception, e:
249            # Other exceptions must be treated as a ERROR when
250            # raised during the cleanup() phase.
251            raise error.UnhandledTestError(e)
252
253
254    def _exec(self, args, dargs):
255
256        self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout'))
257        self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr'))
258
259        try:
260            # write out the test attributes into a keyval
261            dargs   = dargs.copy()
262            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
263            keyvals = dargs.pop('test_attributes', {}).copy()
264            keyvals['version'] = self.version
265            for i, arg in enumerate(args):
266                keyvals['param-%d' % i] = repr(arg)
267            for name, arg in dargs.iteritems():
268                keyvals['param-%s' % name] = repr(arg)
269            self.write_test_keyval(keyvals)
270
271            _validate_args(args, dargs, self.initialize, self.setup,
272                           self.execute, self.cleanup)
273
274            try:
275                # Initialize:
276                p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs)
277                self.initialize(*p_args, **p_dargs)
278
279                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
280                try:
281                    fcntl.flock(lockfile, fcntl.LOCK_EX)
282                    # Setup: (compile and install the test, if needed)
283                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
284                    utils.update_version(self.srcdir, self.preserve_srcdir,
285                                         self.version, self.setup,
286                                         *p_args, **p_dargs)
287                finally:
288                    fcntl.flock(lockfile, fcntl.LOCK_UN)
289                    lockfile.close()
290
291                # Execute:
292                os.chdir(self.outputdir)
293                if hasattr(self, 'run_once'):
294                    p_args, p_dargs = _cherry_pick_args(self.run_once,
295                                                        args, dargs)
296                    # pull in any non-* and non-** args from self.execute
297                    for param in _get_nonstar_args(self.execute):
298                        if param in dargs:
299                            p_dargs[param] = dargs[param]
300                else:
301                    p_args, p_dargs = _cherry_pick_args(self.execute,
302                                                        args, dargs)
303                try:
304                    self.execute(*p_args, **p_dargs)
305                except error.AutotestError:
306                    # Pass already-categorized errors on up as is.
307                    raise
308                except Exception, e:
309                    # Other exceptions must be treated as a FAIL when
310                    # raised during the execute() phase.
311                    raise error.UnhandledTestFail(e)
312            except Exception:
313                # Save the exception while we run our cleanup() before
314                # reraising it.
315                exc_info = sys.exc_info()
316                try:
317                    try:
318                        if run_cleanup:
319                            self._run_cleanup(args, dargs)
320                    except Exception:
321                        print 'Ignoring exception during cleanup() phase:'
322                        traceback.print_exc()
323                        print 'Now raising the earlier %s error' % exc_info[0]
324                finally:
325                    self.job.stderr.restore()
326                    self.job.stdout.restore()
327                    try:
328                        raise exc_info[0], exc_info[1], exc_info[2]
329                    finally:
330                        # http://docs.python.org/library/sys.html#sys.exc_info
331                        # Be nice and prevent a circular reference.
332                        del exc_info
333            else:
334                try:
335                    if run_cleanup:
336                        self._run_cleanup(args, dargs)
337                finally:
338                    self.job.stderr.restore()
339                    self.job.stdout.restore()
340        except error.AutotestError:
341            # Pass already-categorized errors on up.
342            raise
343        except Exception, e:
344            # Anything else is an ERROR in our own code, not execute().
345            raise error.UnhandledTestError(e)
346
347
348def _get_nonstar_args(func):
349    """Extract all the (normal) function parameter names.
350
351    Given a function, returns a tuple of parameter names, specifically
352    excluding the * and ** parameters, if the function accepts them.
353
354    @param func: A callable that we want to chose arguments for.
355
356    @return: A tuple of parameters accepted by the function.
357    """
358    return func.func_code.co_varnames[:func.func_code.co_argcount]
359
360
361def _cherry_pick_args(func, args, dargs):
362    """Sanitize positional and keyword arguments before calling a function.
363
364    Given a callable (func), an argument tuple and a dictionary of keyword
365    arguments, pick only those arguments which the function is prepared to
366    accept and return a new argument tuple and keyword argument dictionary.
367
368    Args:
369      func: A callable that we want to choose arguments for.
370      args: A tuple of positional arguments to consider passing to func.
371      dargs: A dictionary of keyword arguments to consider passing to func.
372    Returns:
373      A tuple of: (args tuple, keyword arguments dictionary)
374    """
375    # Cherry pick args:
376    if func.func_code.co_flags & 0x04:
377        # func accepts *args, so return the entire args.
378        p_args = args
379    else:
380        p_args = ()
381
382    # Cherry pick dargs:
383    if func.func_code.co_flags & 0x08:
384        # func accepts **dargs, so return the entire dargs.
385        p_dargs = dargs
386    else:
387        # Only return the keyword arguments that func accepts.
388        p_dargs = {}
389        for param in _get_nonstar_args(func):
390            if param in dargs:
391                p_dargs[param] = dargs[param]
392
393    return p_args, p_dargs
394
395
396def _validate_args(args, dargs, *funcs):
397    """Verify that arguments are appropriate for at least one callable.
398
399    Given a list of callables as additional parameters, verify that
400    the proposed keyword arguments in dargs will each be accepted by at least
401    one of the callables.
402
403    NOTE: args is currently not supported and must be empty.
404
405    Args:
406      args: A tuple of proposed positional arguments.
407      dargs: A dictionary of proposed keyword arguments.
408      *funcs: Callables to be searched for acceptance of args and dargs.
409    Raises:
410      error.AutotestError: if an arg won't be accepted by any of *funcs.
411    """
412    all_co_flags = 0
413    all_varnames = ()
414    for func in funcs:
415        all_co_flags |= func.func_code.co_flags
416        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
417
418    # Check if given args belongs to at least one of the methods below.
419    if len(args) > 0:
420        # Current implementation doesn't allow the use of args.
421        raise error.TestError('Unnamed arguments not accepted. Please '
422                              'call job.run_test with named args only')
423
424    # Check if given dargs belongs to at least one of the methods below.
425    if len(dargs) > 0:
426        if not all_co_flags & 0x08:
427            # no func accepts *dargs, so:
428            for param in dargs:
429                if not param in all_varnames:
430                    raise error.AutotestError('Unknown parameter: %s' % param)
431
432
433def _installtest(job, url):
434    (group, name) = job.pkgmgr.get_package_name(url, 'test')
435
436    # Bail if the test is already installed
437    group_dir = os.path.join(job.testdir, "download", group)
438    if os.path.exists(os.path.join(group_dir, name)):
439        return (group, name)
440
441    # If the group directory is missing create it and add
442    # an empty  __init__.py so that sub-directories are
443    # considered for import.
444    if not os.path.exists(group_dir):
445        os.mkdir(group_dir)
446        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
447        f.close()
448
449    print name + ": installing test url=" + url
450    tarball = os.path.basename(url)
451    tarball_path = os.path.join(group_dir, tarball)
452    test_dir = os.path.join(group_dir, name)
453    job.pkgmgr.fetch_pkg(tarball, tarball_path,
454                         repo_url = os.path.dirname(url))
455
456    # Create the directory for the test
457    if not os.path.exists(test_dir):
458        os.mkdir(os.path.join(group_dir, name))
459
460    job.pkgmgr.untar_pkg(tarball_path, test_dir)
461
462    os.remove(tarball_path)
463
464    # For this 'sub-object' to be importable via the name
465    # 'group.name' we need to provide an __init__.py,
466    # so link the main entry point to this.
467    os.symlink(name + '.py', os.path.join(group_dir, name,
468                            '__init__.py'))
469
470    # The test is now installed.
471    return (group, name)
472
473
474def runtest(job, url, tag, args, dargs,
475            local_namespace={}, global_namespace={},
476            before_test_hook=None, after_test_hook=None):
477    local_namespace = local_namespace.copy()
478    global_namespace = global_namespace.copy()
479
480    # if this is not a plain test name then download and install the
481    # specified test
482    if url.endswith('.tar.bz2'):
483        (group, testname) = _installtest(job, url)
484        bindir = os.path.join(job.testdir, 'download', group, testname)
485        site_bindir = None
486    else:
487        # if the test is local, it can be found in either testdir
488        # or site_testdir. tests in site_testdir override tests
489        # defined in testdir
490        (group, testname) = ('', url)
491        bindir = os.path.join(job.testdir, group, testname)
492        if hasattr(job, 'site_testdir'):
493            site_bindir = os.path.join(job.site_testdir,
494                                       group, testname)
495        else:
496            site_bindir = None
497
498        # The job object here can be that of a server side job or a client
499        # side job. 'install_pkg' method won't be present for server side
500        # jobs, so do the fetch only if that method is present in the job
501        # obj.
502        if hasattr(job, 'install_pkg'):
503            try:
504                job.install_pkg(testname, 'test', bindir)
505            except packages.PackageInstallError, e:
506                # continue as a fall back mechanism and see if the test code
507                # already exists on the machine
508                pass
509
510    outputdir = os.path.join(job.resultdir, testname)
511    if tag:
512        outputdir += '.' + tag
513
514    # if we can find the test in site_bindir, use this version
515    if site_bindir and os.path.exists(site_bindir):
516        bindir = site_bindir
517        testdir = job.site_testdir
518    elif os.path.exists(bindir):
519        testdir = job.testdir
520    else:
521        raise error.TestError(testname + ': test does not exist')
522
523    local_namespace['job'] = job
524    local_namespace['bindir'] = bindir
525    local_namespace['outputdir'] = outputdir
526
527    if group:
528        sys.path.insert(0, os.path.join(testdir, 'download'))
529        group += '.'
530    else:
531        sys.path.insert(0, os.path.join(testdir, testname))
532
533    try:
534        exec ("import %s%s" % (group, testname),
535              local_namespace, global_namespace)
536        exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
537              (group, testname, testname),
538              local_namespace, global_namespace)
539    finally:
540        sys.path.pop(0)
541
542    pwd = os.getcwd()
543    os.chdir(outputdir)
544    try:
545        mytest = global_namespace['mytest']
546        if before_test_hook:
547            before_test_hook(mytest)
548        mytest._exec(args, dargs)
549    finally:
550        os.chdir(pwd)
551        if after_test_hook:
552            after_test_hook(mytest)
553        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
554