test.py revision 14f9856aac6c3d43830553d849442914988349c6
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<testname.tag>
18
19import os, sys, re, fcntl, shutil, tarfile, warnings
20
21from autotest_lib.client.common_lib import error, utils
22
23
24class base_test:
25    preserve_srcdir = False
26
27    def __init__(self, job, bindir, outputdir):
28        self.job = job
29        self.autodir = job.autodir
30
31        self.outputdir = outputdir
32        tagged_testname = os.path.basename(self.outputdir)
33        self.resultsdir = os.path.join(self.outputdir, 'results')
34        os.mkdir(self.resultsdir)
35        self.profdir = os.path.join(self.outputdir, 'profiling')
36        os.mkdir(self.profdir)
37        self.debugdir = os.path.join(self.outputdir, 'debug')
38        os.mkdir(self.debugdir)
39        self.bindir = bindir
40        if hasattr(job, 'libdir'):
41            self.libdir = job.libdir
42        self.srcdir = os.path.join(self.bindir, 'src')
43
44        self.tmpdir = os.path.join(job.tmpdir, tagged_testname)
45
46        if os.path.exists(self.tmpdir):
47            shutil.rmtree(self.tmpdir)
48        os.mkdir(self.tmpdir)
49
50
51    def assert_(self, expr, msg='Assertion failed.'):
52        if not expr:
53            raise error.TestError(msg)
54
55
56    def write_test_keyval(self, attr_dict):
57        utils.write_keyval(self.outputdir, attr_dict)
58
59
60    @staticmethod
61    def _append_type_to_keys(dictionary, typename):
62        new_dict = {}
63        for key, value in dictionary.iteritems():
64            new_key = "%s{%s}" % (key, typename)
65            new_dict[new_key] = value
66        return new_dict
67
68
69    def write_perf_keyval(self, perf_dict):
70        self.write_iteration_keyval({}, perf_dict)
71
72
73    def write_attr_keyval(self, attr_dict):
74        self.write_iteration_keyval(attr_dict, {})
75
76
77    def write_iteration_keyval(self, attr_dict, perf_dict):
78        if attr_dict:
79            attr_dict = self._append_type_to_keys(attr_dict, "attr")
80            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
81
82        if perf_dict:
83            perf_dict = self._append_type_to_keys(perf_dict, "perf")
84            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
85
86        keyval_path = os.path.join(self.resultsdir, "keyval")
87        print >> open(keyval_path, "a"), ""
88
89
90    def initialize(self):
91        print 'No initialize phase defined'
92        pass
93
94
95    def setup(self):
96        pass
97
98
99    def warmup(self, *args, **dargs):
100        pass
101
102
103    def execute(self, iterations=1, *args, **dargs):
104        self.warmup(*args, **dargs)
105
106        profilers = self.job.profilers
107        # Dropped profilers.only() - if you want that, use iterations=0
108        for i in range(iterations):
109            self.run_once(*args, **dargs)
110
111        # Do a profiling run if necessary
112        if profilers.present():
113            profilers.start(self)
114            self.run_once(*args, **dargs)
115            profilers.stop(self)
116            profilers.report(self)
117
118        # Do any postprocessing, normally extracting performance keyvals, etc
119        self.postprocess()
120
121
122    def postprocess(self):
123        pass
124
125
126    def cleanup(self):
127        pass
128
129
130    def _exec(self, args, dargs):
131        self.job.stdout.tee_redirect(os.path.join(self.debugdir, 'stdout'))
132        self.job.stderr.tee_redirect(os.path.join(self.debugdir, 'stderr'))
133
134        try:
135            # write out the test attributes into a keyval
136            dargs   = dargs.copy()
137            keyvals = dargs.pop('test_attributes', dict()).copy()
138            keyvals['version'] = self.version
139            self.write_test_keyval(keyvals)
140
141            _validate_args(args, dargs, self.initialize, self.setup,
142                           self.execute, self.cleanup)
143
144            try:
145                # Initialize:
146                p_args, p_dargs = _cherry_pick_args(self.initialize,args,dargs)
147                self.initialize(*p_args, **p_dargs)
148
149                # Setup: (compile and install the test, if needed)
150                p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
151                utils.update_version(self.srcdir, self.preserve_srcdir,
152                                     self.version, self.setup,
153                                     *p_args, **p_dargs)
154
155                # Execute:
156                os.chdir(self.outputdir)
157                if hasattr(self, 'run_once'):
158                    p_args, p_dargs = _cherry_pick_args(self.run_once,
159                                                        args, dargs)
160                    if 'iterations' in dargs:
161                        p_dargs['iterations'] = dargs['iterations']
162                else:
163                    p_args, p_dargs = _cherry_pick_args(self.execute,
164                                                        args, dargs)
165                try:
166                    self.execute(*p_args, **p_dargs)
167                except error.AutotestError:
168                    raise
169                except Exception, e:
170                    raise error.UnhandledTestFail(e)
171
172            finally:
173                self.cleanup()
174                self.job.stderr.restore()
175                self.job.stdout.restore()
176        except error.AutotestError:
177            raise
178        except Exception, e:
179            raise error.UnhandledTestError(e)
180
181
182def _cherry_pick_args(func, args, dargs):
183    # Cherry pick args:
184    if func.func_code.co_flags & 0x04:
185        # func accepts *args, so return the entire args.
186        p_args = args
187    else:
188        p_args = ()
189
190    # Cherry pick dargs:
191    if func.func_code.co_flags & 0x08:
192        # func accepts **dargs, so return the entire dargs.
193        p_dargs = dargs
194    else:
195        p_dargs = {}
196        for param in func.func_code.co_varnames[:func.func_code.co_argcount]:
197            if param in dargs:
198                p_dargs[param] = dargs[param]
199
200    return p_args, p_dargs
201
202
203def _validate_args(args, dargs, *funcs):
204    all_co_flags = 0
205    all_varnames = ()
206    for func in funcs:
207        all_co_flags |= func.func_code.co_flags
208        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
209
210    # Check if given args belongs to at least one of the methods below.
211    if len(args) > 0:
212        # Current implementation doesn't allow the use of args.
213        raise error.AutotestError('Unnamed arguments not accepted. Please, ' \
214                        'call job.run_test with named args only')
215
216    # Check if given dargs belongs to at least one of the methods below.
217    if len(dargs) > 0:
218        if not all_co_flags & 0x08:
219            # no func accepts *dargs, so:
220            for param in dargs:
221                if not param in all_varnames:
222                    raise error.AutotestError('Unknown parameter: %s' % param)
223
224
225def testname(url):
226    # Extract the testname from the test url.
227    match = re.match('[^:]+://(.*)/([^/]*)$', url)
228    if not match:
229        return ('', url)
230    (group, filename) = match.groups()
231
232    # Generate the group prefix.
233    group = re.sub(r'\W', '_', group)
234
235    # Drop the extension to get the raw test name.
236    testname = re.sub(r'\.tgz', '', filename)
237
238    return (group, testname)
239
240
241def _installtest(job, url):
242    (group, name) = testname(url)
243
244    # Bail if the test is already installed
245    group_dir = os.path.join(job.testdir, "download", group)
246    if os.path.exists(os.path.join(group_dir, name)):
247        return (group, name)
248
249    # If the group directory is missing create it and add
250    # an empty  __init__.py so that sub-directories are
251    # considered for import.
252    if not os.path.exists(group_dir):
253        os.mkdir(group_dir)
254        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
255        f.close()
256
257    print name + ": installing test url=" + url
258    utils.get_file(url, os.path.join(group_dir, 'test.tgz'))
259    old_wd = os.getcwd()
260    os.chdir(group_dir)
261    tar = tarfile.open('test.tgz')
262    for member in tar.getmembers():
263        tar.extract(member)
264    tar.close()
265    os.chdir(old_wd)
266    os.remove(os.path.join(group_dir, 'test.tgz'))
267
268    # For this 'sub-object' to be importable via the name
269    # 'group.name' we need to provide an __init__.py,
270    # so link the main entry point to this.
271    os.symlink(name + '.py', os.path.join(group_dir, name,
272                            '__init__.py'))
273
274    # The test is now installed.
275    return (group, name)
276
277
278def runtest(job, url, tag, args, dargs,
279            local_namespace={}, global_namespace={}, after_test_hook=None):
280    local_namespace = local_namespace.copy()
281    global_namespace = global_namespace.copy()
282
283    # if this is not a plain test name then download and install the
284    # specified test
285    if utils.is_url(url):
286        (group, testname) = _installtest(job, url)
287        bindir = os.path.join(job.testdir, 'download', group, testname)
288        site_bindir = None
289    else:
290        # if the test is local, it can be found in either testdir
291        # or site_testdir. tests in site_testdir override tests
292        # defined in testdir
293        (group, testname) = ('', url)
294        bindir = os.path.join(job.testdir, group, testname)
295        if hasattr(job, 'site_testdir'):
296            site_bindir = os.path.join(job.site_testdir,
297                                       group, testname)
298        else:
299            site_bindir = None
300
301    outputdir = os.path.join(job.resultdir, testname)
302    if tag:
303        outputdir += '.' + tag
304
305    # if we can find the test in site_bindir, use this version
306    if site_bindir and os.path.exists(site_bindir):
307        bindir = site_bindir
308        testdir = job.site_testdir
309    elif os.path.exists(bindir):
310        testdir = job.testdir
311    elif not os.path.exists(bindir):
312        raise error.TestError(testname + ': test does not exist')
313
314    if group:
315        sys.path.insert(0, os.path.join(testdir, 'download'))
316        group += '.'
317    else:
318        sys.path.insert(0, os.path.join(testdir, testname))
319
320    local_namespace['job'] = job
321    local_namespace['bindir'] = bindir
322    local_namespace['outputdir'] = outputdir
323
324    lockfile = open(os.path.join(job.tmpdir, '.testlock'), 'w')
325    try:
326        fcntl.flock(lockfile, fcntl.LOCK_EX)
327        exec ("import %s%s" % (group, testname),
328              local_namespace, global_namespace)
329        exec ("mytest = %s%s.%s(job, bindir, outputdir)" %
330              (group, testname, testname),
331              local_namespace, global_namespace)
332    finally:
333        fcntl.flock(lockfile, fcntl.LOCK_UN)
334        lockfile.close()
335        sys.path.pop(0)
336
337    pwd = os.getcwd()
338    os.chdir(outputdir)
339    try:
340        mytest = global_namespace['mytest']
341        mytest._exec(args, dargs)
342    finally:
343        if after_test_hook:
344            after_test_hook(mytest)
345