test.py revision 47faacb31010a57d56296079bbbaf2f141b2a4e0
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19import os, sys, re, fcntl, shutil, tarfile, time, warnings, tempfile
20
21from autotest_lib.client.common_lib import error, utils, packages
22from autotest_lib.client.bin import autotest_utils
23
24
25class base_test:
26    preserve_srcdir = False
27
28    def __init__(self, job, bindir, outputdir):
29        self.job = job
30        self.autodir = job.autodir
31
32        self.outputdir = outputdir
33        tagged_testname = os.path.basename(self.outputdir)
34        self.resultsdir = os.path.join(self.outputdir, 'results')
35        os.mkdir(self.resultsdir)
36        self.profdir = os.path.join(self.outputdir, 'profiling')
37        os.mkdir(self.profdir)
38        self.debugdir = os.path.join(self.outputdir, 'debug')
39        os.mkdir(self.debugdir)
40        self.bindir = bindir
41        if hasattr(job, 'libdir'):
42            self.libdir = job.libdir
43        self.srcdir = os.path.join(self.bindir, 'src')
44        self.tmpdir = tempfile.mkdtemp("_" + tagged_testname, dir=job.tmpdir)
45
46
47    def assert_(self, expr, msg='Assertion failed.'):
48        if not expr:
49            raise error.TestError(msg)
50
51
52    def write_test_keyval(self, attr_dict):
53        utils.write_keyval(self.outputdir, attr_dict)
54
55
56    @staticmethod
57    def _append_type_to_keys(dictionary, typename):
58        new_dict = {}
59        for key, value in dictionary.iteritems():
60            new_key = "%s{%s}" % (key, typename)
61            new_dict[new_key] = value
62        return new_dict
63
64
65    def write_perf_keyval(self, perf_dict):
66        self.write_iteration_keyval({}, perf_dict)
67
68
69    def write_attr_keyval(self, attr_dict):
70        self.write_iteration_keyval(attr_dict, {})
71
72
73    def write_iteration_keyval(self, attr_dict, perf_dict):
74        if attr_dict:
75            attr_dict = self._append_type_to_keys(attr_dict, "attr")
76            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
77
78        if perf_dict:
79            perf_dict = self._append_type_to_keys(perf_dict, "perf")
80            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
81
82        keyval_path = os.path.join(self.resultsdir, "keyval")
83        print >> open(keyval_path, "a"), ""
84
85
86    def initialize(self):
87        print 'No initialize phase defined'
88        pass
89
90
91    def setup(self):
92        pass
93
94
95    def warmup(self, *args, **dargs):
96        pass
97
98
99    def execute(self, iterations=None, test_length=None, *args, **dargs):
100        """
101        This is the basic execute method for the tests inherited from base_test.
102        If you want to implement a benchmark test, it's better to implement
103        the run_once function, to cope with the profiling infrastructure. For
104        other tests, you can just override the default implementation.
105
106        @param test_length: The minimum test length in seconds. We'll run the
107        run_once function for a number of times large enough to cover the
108        minimum test length.
109
110        @param iterations: A number of iterations that we'll run the run_once
111        function. This parameter is incompatible with test_length and will
112        be silently ignored if you specify both.
113        """
114
115        self.warmup(*args, **dargs)
116        # For our special class of tests, the benchmarks, we don't want
117        # profilers to run during the test iterations. Let's reserve only
118        # the last iteration for profiling, if needed. So let's stop
119        # all profilers if they are present and active.
120        profilers = self.job.profilers
121        if profilers.active():
122            profilers.stop(self)
123        # If the user called this test in an odd way (specified both iterations
124        # and test_length), let's warn him
125        if iterations and test_length:
126            print 'Iterations parameter silently ignored (timed execution)'
127        if test_length:
128            test_start = time.time()
129            time_elapsed = 0
130            timed_counter = 0
131            print 'Benchmark started. Minimum test length: %d s' % (test_length)
132            while time_elapsed < test_length:
133                timed_counter = timed_counter + 1
134                if time_elapsed == 0:
135                    print 'Executing iteration %d' % (timed_counter)
136                elif time_elapsed > 0:
137                    print 'Executing iteration %d, time_elapsed %d s' % \
138                           (timed_counter, time_elapsed)
139                self.run_once(*args, **dargs)
140                test_iteration_finish = time.time()
141                time_elapsed = test_iteration_finish - test_start
142            print 'Benchmark finished after %d iterations' % (timed_counter)
143            print 'Time elapsed: %d s' % (time_elapsed)
144        else:
145            if not iterations:
146                iterations = 1
147            # Dropped profilers.only() - if you want that, use iterations=0
148            print 'Benchmark started. Number of iterations: %d' % (iterations)
149            for self.iteration in range(1, iterations+1):
150                print 'Executing iteration %d of %d' % (self.iteration,
151                                                                    iterations)
152                self.run_once(*args, **dargs)
153            print 'Benchmark finished after %d iterations' % (iterations)
154
155        # Do a profiling run if necessary
156        if profilers.present():
157            profilers.start(self)
158            print 'Profilers present. Profiling run started'
159            self.run_once(*args, **dargs)
160            profilers.stop(self)
161            profilers.report(self)
162
163        # Do any postprocessing, normally extracting performance keyvals, etc
164        self.postprocess()
165
166
167    def postprocess(self):
168        pass
169
170
171    def cleanup(self):
172        pass
173
174
175    def _exec(self, args, dargs):
176        self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout'))
177        self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr'))
178
179        try:
180            # write out the test attributes into a keyval
181            dargs   = dargs.copy()
182            keyvals = dargs.pop('test_attributes', dict()).copy()
183            keyvals['version'] = self.version
184            for i, arg in enumerate(args):
185                keyvals['param-%d' % i] = repr(arg)
186            for name, arg in dargs.iteritems():
187                keyvals['param-%s' % name] = repr(arg)
188            self.write_test_keyval(keyvals)
189
190            _validate_args(args, dargs, self.initialize, self.setup,
191                           self.execute, self.cleanup)
192
193            try:
194                # Initialize:
195                p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs)
196                self.initialize(*p_args, **p_dargs)
197
198                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
199                try:
200                    fcntl.flock(lockfile, fcntl.LOCK_EX)
201                    # Setup: (compile and install the test, if needed)
202                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
203                    utils.update_version(self.srcdir, self.preserve_srcdir,
204                                         self.version, self.setup,
205                                         *p_args, **p_dargs)
206                finally:
207                    fcntl.flock(lockfile, fcntl.LOCK_UN)
208                    lockfile.close()
209
210                # Execute:
211                if self.job.drop_caches:
212                    print "Dropping caches before running test"
213                    autotest_utils.drop_caches()
214
215                os.chdir(self.outputdir)
216                if hasattr(self, 'run_once'):
217                    p_args, p_dargs = _cherry_pick_args(self.run_once,
218                                                        args, dargs)
219                    if 'iterations' in dargs:
220                        p_dargs['iterations'] = dargs['iterations']
221                    if 'test_length' in dargs:
222                        p_dargs['test_length'] = dargs['test_length']
223                else:
224                    p_args, p_dargs = _cherry_pick_args(self.execute,
225                                                        args, dargs)
226                try:
227                    self.execute(*p_args, **p_dargs)
228                except error.AutotestError:
229                    raise
230                except Exception, e:
231                    raise error.UnhandledTestFail(e)
232            except:
233                exc_info = sys.exc_info()
234            else:
235                exc_info = None
236
237            # run the cleanup, and then restore the job.std* streams
238            try:
239                p_args, p_dargs = _cherry_pick_args(self.cleanup, args, dargs)
240                # if an exception occurs during the cleanup() call, we
241                # don't want it to override an existing exception
242                # (i.e. exc_info) that was thrown by the test execution
243                if exc_info:
244                    try:
245                        self.cleanup(*p_args, **p_dargs)
246                    finally:
247                        try:
248                            raise exc_info[0], exc_info[1], exc_info[2]
249                        finally:
250                            # necessary to prevent a circular reference
251                            # between exc_info[2] (the traceback, which
252                            # references all the exception stack frames)
253                            # and this stack frame (which refs exc_info[2])
254                            del exc_info
255                else:
256                    self.cleanup(*p_args, **p_dargs)
257            finally:
258                self.job.stderr.restore()
259                self.job.stdout.restore()
260
261        except error.AutotestError:
262            raise
263        except Exception, e:
264            raise error.UnhandledTestError(e)
265
266
267def _cherry_pick_args(func, args, dargs):
268    # Cherry pick args:
269    if func.func_code.co_flags & 0x04:
270        # func accepts *args, so return the entire args.
271        p_args = args
272    else:
273        p_args = ()
274
275    # Cherry pick dargs:
276    if func.func_code.co_flags & 0x08:
277        # func accepts **dargs, so return the entire dargs.
278        p_dargs = dargs
279    else:
280        p_dargs = {}
281        for param in func.func_code.co_varnames[:func.func_code.co_argcount]:
282            if param in dargs:
283                p_dargs[param] = dargs[param]
284
285    return p_args, p_dargs
286
287
288def _validate_args(args, dargs, *funcs):
289    all_co_flags = 0
290    all_varnames = ()
291    for func in funcs:
292        all_co_flags |= func.func_code.co_flags
293        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
294
295    # Check if given args belongs to at least one of the methods below.
296    if len(args) > 0:
297        # Current implementation doesn't allow the use of args.
298        raise error.AutotestError('Unnamed arguments not accepted. Please, ' \
299                        'call job.run_test with named args only')
300
301    # Check if given dargs belongs to at least one of the methods below.
302    if len(dargs) > 0:
303        if not all_co_flags & 0x08:
304            # no func accepts *dargs, so:
305            for param in dargs:
306                if not param in all_varnames:
307                    raise error.AutotestError('Unknown parameter: %s' % param)
308
309
310def _installtest(job, url):
311    (group, name) = job.pkgmgr.get_package_name(url, 'test')
312
313    # Bail if the test is already installed
314    group_dir = os.path.join(job.testdir, "download", group)
315    if os.path.exists(os.path.join(group_dir, name)):
316        return (group, name)
317
318    # If the group directory is missing create it and add
319    # an empty  __init__.py so that sub-directories are
320    # considered for import.
321    if not os.path.exists(group_dir):
322        os.mkdir(group_dir)
323        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
324        f.close()
325
326    print name + ": installing test url=" + url
327    tarball = os.path.basename(url)
328    tarball_path = os.path.join(group_dir, tarball)
329    test_dir = os.path.join(group_dir, name)
330    job.pkgmgr.fetch_pkg(tarball, tarball_path,
331                         repo_url = os.path.dirname(url))
332
333    # Create the directory for the test
334    if not os.path.exists(test_dir):
335        os.mkdir(os.path.join(group_dir, name))
336
337    job.pkgmgr.untar_pkg(tarball_path, test_dir)
338
339    os.remove(tarball_path)
340
341    # For this 'sub-object' to be importable via the name
342    # 'group.name' we need to provide an __init__.py,
343    # so link the main entry point to this.
344    os.symlink(name + '.py', os.path.join(group_dir, name,
345                            '__init__.py'))
346
347    # The test is now installed.
348    return (group, name)
349
350
351def runtest(job, url, tag, args, dargs,
352            local_namespace={}, global_namespace={}, after_test_hook=None):
353    local_namespace = local_namespace.copy()
354    global_namespace = global_namespace.copy()
355
356    # if this is not a plain test name then download and install the
357    # specified test
358    if url.endswith('.tar.bz2'):
359        (group, testname) = _installtest(job, url)
360        bindir = os.path.join(job.testdir, 'download', group, testname)
361        site_bindir = None
362    else:
363        # if the test is local, it can be found in either testdir
364        # or site_testdir. tests in site_testdir override tests
365        # defined in testdir
366        (group, testname) = ('', url)
367        bindir = os.path.join(job.testdir, group, testname)
368        if hasattr(job, 'site_testdir'):
369            site_bindir = os.path.join(job.site_testdir,
370                                       group, testname)
371        else:
372            site_bindir = None
373
374        # The job object here can be that of a server side job or a client
375        # side job. 'install_pkg' method won't be present for server side
376        # jobs, so do the fetch only if that method is present in the job
377        # obj.
378        if hasattr(job, 'install_pkg'):
379            try:
380                job.install_pkg(testname, 'test', bindir)
381            except packages.PackageInstallError, e:
382                # continue as a fall back mechanism and see if the test code
383                # already exists on the machine
384                pass
385
386    outputdir = os.path.join(job.resultdir, testname)
387    if tag:
388        outputdir += '.' + tag
389
390    # if we can find the test in site_bindir, use this version
391    if site_bindir and os.path.exists(site_bindir):
392        bindir = site_bindir
393        testdir = job.site_testdir
394    elif os.path.exists(bindir):
395        testdir = job.testdir
396    else:
397        raise error.TestError(testname + ': test does not exist')
398
399    local_namespace['job'] = job
400    local_namespace['bindir'] = bindir
401    local_namespace['outputdir'] = outputdir
402
403    if group:
404        sys.path.insert(0, os.path.join(testdir, 'download'))
405        group += '.'
406    else:
407        sys.path.insert(0, os.path.join(testdir, testname))
408
409    try:
410        exec ("import %s%s" % (group, testname),
411              local_namespace, global_namespace)
412        exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
413              (group, testname, testname),
414              local_namespace, global_namespace)
415    finally:
416        sys.path.pop(0)
417
418    pwd = os.getcwd()
419    os.chdir(outputdir)
420    try:
421        mytest = global_namespace['mytest']
422        mytest._exec(args, dargs)
423    finally:
424        os.chdir(pwd)
425        if after_test_hook:
426            after_test_hook(mytest)
427        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
428