test.py revision 918863f5297653bd470674d29db0e4dc5c159a07
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19#pylint: disable-msg=C0111
20
21import fcntl, json, os, re, sys, shutil, tempfile, time, traceback
22import logging
23
24from autotest_lib.client.common_lib import error
25from autotest_lib.client.bin import utils
26
27
28class base_test(object):
29    preserve_srcdir = False
30    network_destabilizing = False
31
32    def __init__(self, job, bindir, outputdir):
33        self.job = job
34        self.pkgmgr = job.pkgmgr
35        self.autodir = job.autodir
36        self.outputdir = outputdir
37        self.tagged_testname = os.path.basename(self.outputdir)
38        self.resultsdir = os.path.join(self.outputdir, 'results')
39        os.mkdir(self.resultsdir)
40        self.profdir = os.path.join(self.outputdir, 'profiling')
41        os.mkdir(self.profdir)
42        self.debugdir = os.path.join(self.outputdir, 'debug')
43        os.mkdir(self.debugdir)
44        # TODO(ericli): figure out how autotest crash handler work with cros
45        # Once this is re-enabled import getpass. crosbug.com/31232
46        # crash handler, we should restore it in near term.
47        # if getpass.getuser() == 'root':
48        #     self.configure_crash_handler()
49        # else:
50        self.crash_handling_enabled = False
51        self.bindir = bindir
52        self.srcdir = os.path.join(self.bindir, 'src')
53        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
54                                       dir=job.tmpdir)
55        self._keyvals = []
56        self._new_keyval = False
57        self.failed_constraints = []
58        self.iteration = 0
59        self.before_iteration_hooks = []
60        self.after_iteration_hooks = []
61
62
63    def configure_crash_handler(self):
64        pass
65
66
67    def crash_handler_report(self):
68        pass
69
70
71    def assert_(self, expr, msg='Assertion failed.'):
72        if not expr:
73            raise error.TestError(msg)
74
75
76    def write_test_keyval(self, attr_dict):
77        utils.write_keyval(self.outputdir, attr_dict,
78                           tap_report=self.job._tap)
79
80    @staticmethod
81    def _append_type_to_keys(dictionary, typename):
82        new_dict = {}
83        for key, value in dictionary.iteritems():
84            new_key = "%s{%s}" % (key, typename)
85            new_dict[new_key] = value
86        return new_dict
87
88
89    def output_perf_value(self, description, value, units,
90                          higher_is_better=True):
91        """
92        Records a measured performance value in an output file.
93
94        The output file will subsequently be parsed by the TKO parser to have
95        the information inserted into the results database.
96
97        @param description: A string describing the measured perf value. Must
98                be maximum length 256, and may only contain letters, numbers,
99                periods, dashes, and underscores.  For example:
100                "page_load_time", "scrolling-frame-rate".
101        @param value: A number representing the measured perf value, or a list
102                of measured values if a test takes multiple measurements.
103                Measured perf values can be either ints or floats.
104        @param units: A string describing the units associated with the
105                measured perf value. Must be maximum length 32, and may only
106                contain letters, numbers, periods, dashes, and underscores.
107                For example: "msec", "fps", "score", "runs_per_second".
108        @param higher_is_better: A boolean indicating whether or not a "higher"
109                measured perf value is considered to be better. If False, it is
110                assumed that a "lower" measured value is considered to be
111                better.
112
113        """
114        if len(description) > 256:
115            raise ValueError('The description must be at most 256 characters.')
116        if len(units) > 32:
117            raise ValueError('The units must be at most 32 characters.')
118        string_regex = re.compile(r'^[-\.\w]+$')
119        if (not string_regex.search(description) or
120            not string_regex.search(units)):
121            raise ValueError('Invalid description or units string. May only '
122                             'contain letters, numbers, periods, dashes, and '
123                             'underscores.')
124
125        entry = {
126            'description': description,
127            'value': value,
128            'units': units,
129            'higher_is_better': higher_is_better,
130        }
131
132        output_path = os.path.join(self.resultsdir, 'perf_measurements')
133        with open(output_path, 'a') as fp:
134            fp.write(json.dumps(entry, sort_keys=True) + '\n')
135
136
137    def write_perf_keyval(self, perf_dict):
138        self.write_iteration_keyval({}, perf_dict,
139                                    tap_report=self.job._tap)
140
141
142    def write_attr_keyval(self, attr_dict):
143        self.write_iteration_keyval(attr_dict, {},
144                                    tap_report=self.job._tap)
145
146
147    def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
148        # append the dictionaries before they have the {perf} and {attr} added
149        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
150        self._new_keyval = True
151
152        if attr_dict:
153            attr_dict = self._append_type_to_keys(attr_dict, "attr")
154            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
155                               tap_report=tap_report)
156
157        if perf_dict:
158            perf_dict = self._append_type_to_keys(perf_dict, "perf")
159            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
160                               tap_report=tap_report)
161
162        keyval_path = os.path.join(self.resultsdir, "keyval")
163        print >> open(keyval_path, "a"), ""
164
165
166    def analyze_perf_constraints(self, constraints):
167        if not self._new_keyval:
168            return
169
170        # create a dict from the keyvals suitable as an environment for eval
171        keyval_env = self._keyvals[-1]['perf'].copy()
172        keyval_env['__builtins__'] = None
173        self._new_keyval = False
174        failures = []
175
176        # evaluate each constraint using the current keyvals
177        for constraint in constraints:
178            logging.info('___________________ constraint = %s', constraint)
179            logging.info('___________________ keyvals = %s', keyval_env)
180
181            try:
182                if not eval(constraint, keyval_env):
183                    failures.append('%s: constraint was not met' % constraint)
184            except:
185                failures.append('could not evaluate constraint: %s'
186                                % constraint)
187
188        # keep track of the errors for each iteration
189        self.failed_constraints.append(failures)
190
191
192    def process_failed_constraints(self):
193        msg = ''
194        for i, failures in enumerate(self.failed_constraints):
195            if failures:
196                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
197
198        if msg:
199            raise error.TestFail(msg)
200
201
202    def register_before_iteration_hook(self, iteration_hook):
203        """
204        This is how we expect test writers to register a before_iteration_hook.
205        This adds the method to the list of hooks which are executed
206        before each iteration.
207
208        @param iteration_hook: Method to run before each iteration. A valid
209                               hook accepts a single argument which is the
210                               test object.
211        """
212        self.before_iteration_hooks.append(iteration_hook)
213
214
215    def register_after_iteration_hook(self, iteration_hook):
216        """
217        This is how we expect test writers to register an after_iteration_hook.
218        This adds the method to the list of hooks which are executed
219        after each iteration.
220
221        @param iteration_hook: Method to run after each iteration. A valid
222                               hook accepts a single argument which is the
223                               test object.
224        """
225        self.after_iteration_hooks.append(iteration_hook)
226
227
228    def initialize(self):
229        pass
230
231
232    def setup(self):
233        pass
234
235
236    def warmup(self, *args, **dargs):
237        pass
238
239
240    def drop_caches_between_iterations(self):
241        if self.job.drop_caches_between_iterations:
242            utils.drop_caches()
243
244
245    def _call_run_once_with_retry(self, constraints, profile_only,
246                                  postprocess_profiled_run, args, dargs):
247        """Thin wrapper around _call_run_once that retries unsuccessful tests.
248
249        If the job object's attribute test_retry is > 0 retry any tests that
250        ran unsuccessfully X times.
251        *Note this does not competely re-initialize the test, it only
252            re-executes code once all the initial job set up (packages,
253            sysinfo, etc) is complete.
254        """
255        if self.job.test_retry != 0:
256            logging.info('Test will be retried a maximum of %d times',
257                         self.job.test_retry)
258
259        max_runs = self.job.test_retry
260        for retry_run in xrange(0, max_runs+1):
261            try:
262                self._call_run_once(constraints, profile_only,
263                                    postprocess_profiled_run, args, dargs)
264                break
265            except error.TestFailRetry as err:
266                if retry_run == max_runs:
267                    raise
268                self.job.record('INFO', None, None, 'Run %s failed with %s' % (
269                        retry_run, err))
270        if retry_run > 0:
271            self.write_test_keyval({'test_retries_before_success': retry_run})
272
273
274    def _call_run_once(self, constraints, profile_only,
275                       postprocess_profiled_run, args, dargs):
276        self.drop_caches_between_iterations()
277        # execute iteration hooks
278        for hook in self.before_iteration_hooks:
279            hook(self)
280
281        try:
282            if profile_only:
283                if not self.job.profilers.present():
284                    self.job.record('WARN', None, None,
285                                    'No profilers have been added but '
286                                    'profile_only is set - nothing '
287                                    'will be run')
288                self.run_once_profiling(postprocess_profiled_run,
289                                        *args, **dargs)
290            else:
291                self.before_run_once()
292                self.run_once(*args, **dargs)
293                self.after_run_once()
294
295            self.postprocess_iteration()
296            self.analyze_perf_constraints(constraints)
297        finally:
298            for hook in self.after_iteration_hooks:
299                hook(self)
300
301
302    def execute(self, iterations=None, test_length=None, profile_only=None,
303                _get_time=time.time, postprocess_profiled_run=None,
304                constraints=(), *args, **dargs):
305        """
306        This is the basic execute method for the tests inherited from base_test.
307        If you want to implement a benchmark test, it's better to implement
308        the run_once function, to cope with the profiling infrastructure. For
309        other tests, you can just override the default implementation.
310
311        @param test_length: The minimum test length in seconds. We'll run the
312            run_once function for a number of times large enough to cover the
313            minimum test length.
314
315        @param iterations: A number of iterations that we'll run the run_once
316            function. This parameter is incompatible with test_length and will
317            be silently ignored if you specify both.
318
319        @param profile_only: If true run X iterations with profilers enabled.
320            If false run X iterations and one with profiling if profiles are
321            enabled. If None, default to the value of job.default_profile_only.
322
323        @param _get_time: [time.time] Used for unit test time injection.
324
325        @param postprocess_profiled_run: Run the postprocessing for the
326            profiled run.
327        """
328
329        # For our special class of tests, the benchmarks, we don't want
330        # profilers to run during the test iterations. Let's reserve only
331        # the last iteration for profiling, if needed. So let's stop
332        # all profilers if they are present and active.
333        profilers = self.job.profilers
334        if profilers.active():
335            profilers.stop(self)
336        if profile_only is None:
337            profile_only = self.job.default_profile_only
338        # If the user called this test in an odd way (specified both iterations
339        # and test_length), let's warn them.
340        if iterations and test_length:
341            logging.debug('Iterations parameter ignored (timed execution)')
342        if test_length:
343            test_start = _get_time()
344            time_elapsed = 0
345            timed_counter = 0
346            logging.debug('Test started. Specified %d s as the minimum test '
347                          'length', test_length)
348            while time_elapsed < test_length:
349                timed_counter = timed_counter + 1
350                if time_elapsed == 0:
351                    logging.debug('Executing iteration %d', timed_counter)
352                elif time_elapsed > 0:
353                    logging.debug('Executing iteration %d, time_elapsed %d s',
354                                  timed_counter, time_elapsed)
355                self._call_run_once_with_retry(constraints, profile_only,
356                                               postprocess_profiled_run, args,
357                                               dargs)
358                test_iteration_finish = _get_time()
359                time_elapsed = test_iteration_finish - test_start
360            logging.debug('Test finished after %d iterations, '
361                          'time elapsed: %d s', timed_counter, time_elapsed)
362        else:
363            if iterations is None:
364                iterations = 1
365            if iterations > 1:
366                logging.debug('Test started. Specified %d iterations',
367                              iterations)
368            for self.iteration in xrange(1, iterations + 1):
369                if iterations > 1:
370                    logging.debug('Executing iteration %d of %d',
371                                  self.iteration, iterations)
372                self._call_run_once_with_retry(constraints, profile_only,
373                                               postprocess_profiled_run, args,
374                                               dargs)
375
376        if not profile_only:
377            self.iteration += 1
378            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
379
380        # Do any postprocessing, normally extracting performance keyvals, etc
381        self.postprocess()
382        self.process_failed_constraints()
383
384
385    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
386        profilers = self.job.profilers
387        # Do a profiling run if necessary
388        if profilers.present():
389            self.drop_caches_between_iterations()
390            profilers.before_start(self)
391
392            self.before_run_once()
393            profilers.start(self)
394            logging.debug('Profilers present. Profiling run started')
395
396            try:
397                self.run_once(*args, **dargs)
398
399                # Priority to the run_once() argument over the attribute.
400                postprocess_attribute = getattr(self,
401                                                'postprocess_profiled_run',
402                                                False)
403
404                if (postprocess_profiled_run or
405                    (postprocess_profiled_run is None and
406                     postprocess_attribute)):
407                    self.postprocess_iteration()
408
409            finally:
410                profilers.stop(self)
411                profilers.report(self)
412
413            self.after_run_once()
414
415
416    def postprocess(self):
417        pass
418
419
420    def postprocess_iteration(self):
421        pass
422
423
424    def cleanup(self):
425        pass
426
427
428    def before_run_once(self):
429        """
430        Override in tests that need it, will be called before any run_once()
431        call including the profiling run (when it's called before starting
432        the profilers).
433        """
434        pass
435
436
437    def after_run_once(self):
438        """
439        Called after every run_once (including from a profiled run when it's
440        called after stopping the profilers).
441        """
442        pass
443
444
445    def _exec(self, args, dargs):
446        self.job.logging.tee_redirect_debug_dir(self.debugdir,
447                                                log_name=self.tagged_testname)
448        try:
449            if self.network_destabilizing:
450                self.job.disable_warnings("NETWORK")
451
452            # write out the test attributes into a keyval
453            dargs   = dargs.copy()
454            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
455            keyvals = dargs.pop('test_attributes', {}).copy()
456            keyvals['version'] = self.version
457            for i, arg in enumerate(args):
458                keyvals['param-%d' % i] = repr(arg)
459            for name, arg in dargs.iteritems():
460                keyvals['param-%s' % name] = repr(arg)
461            self.write_test_keyval(keyvals)
462
463            _validate_args(args, dargs, self.initialize, self.setup,
464                           self.execute, self.cleanup)
465
466            try:
467                # Initialize:
468                _cherry_pick_call(self.initialize, *args, **dargs)
469
470                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
471                try:
472                    fcntl.flock(lockfile, fcntl.LOCK_EX)
473                    # Setup: (compile and install the test, if needed)
474                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
475                    utils.update_version(self.srcdir, self.preserve_srcdir,
476                                         self.version, self.setup,
477                                         *p_args, **p_dargs)
478                finally:
479                    fcntl.flock(lockfile, fcntl.LOCK_UN)
480                    lockfile.close()
481
482                # Execute:
483                os.chdir(self.outputdir)
484
485                # call self.warmup cherry picking the arguments it accepts and
486                # translate exceptions if needed
487                _call_test_function(_cherry_pick_call, self.warmup,
488                                    *args, **dargs)
489
490                if hasattr(self, 'run_once'):
491                    p_args, p_dargs = _cherry_pick_args(self.run_once,
492                                                        args, dargs)
493                    # pull in any non-* and non-** args from self.execute
494                    for param in _get_nonstar_args(self.execute):
495                        if param in dargs:
496                            p_dargs[param] = dargs[param]
497                else:
498                    p_args, p_dargs = _cherry_pick_args(self.execute,
499                                                        args, dargs)
500
501                _call_test_function(self.execute, *p_args, **p_dargs)
502            except Exception:
503                # Save the exception while we run our cleanup() before
504                # reraising it.
505                exc_info = sys.exc_info()
506                try:
507                    try:
508                        if run_cleanup:
509                            _cherry_pick_call(self.cleanup, *args, **dargs)
510                    except Exception:
511                        logging.error('Ignoring exception during cleanup() phase:')
512                        traceback.print_exc()
513                        logging.error('Now raising the earlier %s error',
514                                      exc_info[0])
515                    self.crash_handler_report()
516                finally:
517                    self.job.logging.restore()
518                    try:
519                        raise exc_info[0], exc_info[1], exc_info[2]
520                    finally:
521                        # http://docs.python.org/library/sys.html#sys.exc_info
522                        # Be nice and prevent a circular reference.
523                        del exc_info
524            else:
525                try:
526                    if run_cleanup:
527                        _cherry_pick_call(self.cleanup, *args, **dargs)
528                    self.crash_handler_report()
529                finally:
530                    self.job.logging.restore()
531        except error.AutotestError:
532            if self.network_destabilizing:
533                self.job.enable_warnings("NETWORK")
534            # Pass already-categorized errors on up.
535            raise
536        except Exception, e:
537            if self.network_destabilizing:
538                self.job.enable_warnings("NETWORK")
539            # Anything else is an ERROR in our own code, not execute().
540            raise error.UnhandledTestError(e)
541        else:
542            if self.network_destabilizing:
543                self.job.enable_warnings("NETWORK")
544
545
546    def runsubtest(self, url, *args, **dargs):
547        """
548        Execute another autotest test from inside the current test's scope.
549
550        @param test: Parent test.
551        @param url: Url of new test.
552        @param tag: Tag added to test name.
553        @param args: Args for subtest.
554        @param dargs: Dictionary with args for subtest.
555        @iterations: Number of subtest iterations.
556        @profile_only: If true execute one profiled run.
557        """
558        dargs["profile_only"] = dargs.get("profile_only", False)
559        test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
560        return self.job.run_test(url, master_testpath=test_basepath,
561                                 *args, **dargs)
562
563
564def _get_nonstar_args(func):
565    """Extract all the (normal) function parameter names.
566
567    Given a function, returns a tuple of parameter names, specifically
568    excluding the * and ** parameters, if the function accepts them.
569
570    @param func: A callable that we want to chose arguments for.
571
572    @return: A tuple of parameters accepted by the function.
573    """
574    return func.func_code.co_varnames[:func.func_code.co_argcount]
575
576
577def _cherry_pick_args(func, args, dargs):
578    """Sanitize positional and keyword arguments before calling a function.
579
580    Given a callable (func), an argument tuple and a dictionary of keyword
581    arguments, pick only those arguments which the function is prepared to
582    accept and return a new argument tuple and keyword argument dictionary.
583
584    Args:
585      func: A callable that we want to choose arguments for.
586      args: A tuple of positional arguments to consider passing to func.
587      dargs: A dictionary of keyword arguments to consider passing to func.
588    Returns:
589      A tuple of: (args tuple, keyword arguments dictionary)
590    """
591    # Cherry pick args:
592    if func.func_code.co_flags & 0x04:
593        # func accepts *args, so return the entire args.
594        p_args = args
595    else:
596        p_args = ()
597
598    # Cherry pick dargs:
599    if func.func_code.co_flags & 0x08:
600        # func accepts **dargs, so return the entire dargs.
601        p_dargs = dargs
602    else:
603        # Only return the keyword arguments that func accepts.
604        p_dargs = {}
605        for param in _get_nonstar_args(func):
606            if param in dargs:
607                p_dargs[param] = dargs[param]
608
609    return p_args, p_dargs
610
611
612def _cherry_pick_call(func, *args, **dargs):
613    """Cherry picks arguments from args/dargs based on what "func" accepts
614    and calls the function with the picked arguments."""
615    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
616    return func(*p_args, **p_dargs)
617
618
619def _validate_args(args, dargs, *funcs):
620    """Verify that arguments are appropriate for at least one callable.
621
622    Given a list of callables as additional parameters, verify that
623    the proposed keyword arguments in dargs will each be accepted by at least
624    one of the callables.
625
626    NOTE: args is currently not supported and must be empty.
627
628    Args:
629      args: A tuple of proposed positional arguments.
630      dargs: A dictionary of proposed keyword arguments.
631      *funcs: Callables to be searched for acceptance of args and dargs.
632    Raises:
633      error.AutotestError: if an arg won't be accepted by any of *funcs.
634    """
635    all_co_flags = 0
636    all_varnames = ()
637    for func in funcs:
638        all_co_flags |= func.func_code.co_flags
639        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
640
641    # Check if given args belongs to at least one of the methods below.
642    if len(args) > 0:
643        # Current implementation doesn't allow the use of args.
644        raise error.TestError('Unnamed arguments not accepted. Please '
645                              'call job.run_test with named args only')
646
647    # Check if given dargs belongs to at least one of the methods below.
648    if len(dargs) > 0:
649        if not all_co_flags & 0x08:
650            # no func accepts *dargs, so:
651            for param in dargs:
652                if not param in all_varnames:
653                    raise error.AutotestError('Unknown parameter: %s' % param)
654
655
656def _installtest(job, url):
657    (group, name) = job.pkgmgr.get_package_name(url, 'test')
658
659    # Bail if the test is already installed
660    group_dir = os.path.join(job.testdir, "download", group)
661    if os.path.exists(os.path.join(group_dir, name)):
662        return (group, name)
663
664    # If the group directory is missing create it and add
665    # an empty  __init__.py so that sub-directories are
666    # considered for import.
667    if not os.path.exists(group_dir):
668        os.makedirs(group_dir)
669        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
670        f.close()
671
672    logging.debug("%s: installing test url=%s", name, url)
673    tarball = os.path.basename(url)
674    tarball_path = os.path.join(group_dir, tarball)
675    test_dir = os.path.join(group_dir, name)
676    job.pkgmgr.fetch_pkg(tarball, tarball_path,
677                         repo_url = os.path.dirname(url))
678
679    # Create the directory for the test
680    if not os.path.exists(test_dir):
681        os.mkdir(os.path.join(group_dir, name))
682
683    job.pkgmgr.untar_pkg(tarball_path, test_dir)
684
685    os.remove(tarball_path)
686
687    # For this 'sub-object' to be importable via the name
688    # 'group.name' we need to provide an __init__.py,
689    # so link the main entry point to this.
690    os.symlink(name + '.py', os.path.join(group_dir, name,
691                            '__init__.py'))
692
693    # The test is now installed.
694    return (group, name)
695
696
697def _call_test_function(func, *args, **dargs):
698    """Calls a test function and translates exceptions so that errors
699    inside test code are considered test failures."""
700    try:
701        return func(*args, **dargs)
702    except error.AutotestError:
703        raise
704    except Exception, e:
705        # Other exceptions must be treated as a FAIL when
706        # raised during the test functions
707        raise error.UnhandledTestFail(e)
708
709
710def runtest(job, url, tag, args, dargs,
711            local_namespace={}, global_namespace={},
712            before_test_hook=None, after_test_hook=None,
713            before_iteration_hook=None, after_iteration_hook=None):
714    local_namespace = local_namespace.copy()
715    global_namespace = global_namespace.copy()
716    # if this is not a plain test name then download and install the
717    # specified test
718    if url.endswith('.tar.bz2'):
719        (testgroup, testname) = _installtest(job, url)
720        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
721        importdir = os.path.join(job.testdir, 'download')
722        site_bindir = None
723        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
724        classname = '%s.%s' % (modulename, testname)
725        path = testname
726    else:
727        # If the test is local, it may be under either testdir or site_testdir.
728        # Tests in site_testdir override tests defined in testdir
729        testname = path = url
730        testgroup = ''
731        path = re.sub(':', '/', testname)
732        modulename = os.path.basename(path)
733        classname = '%s.%s' % (modulename, modulename)
734
735        # Try installing the test package
736        # The job object may be either a server side job or a client side job.
737        # 'install_pkg' method will be present only if it's a client side job.
738        if hasattr(job, 'install_pkg'):
739            try:
740                bindir = os.path.join(job.testdir, testname)
741                job.install_pkg(testname, 'test', bindir)
742            except error.PackageInstallError, e:
743                # continue as a fall back mechanism and see if the test code
744                # already exists on the machine
745                pass
746
747        bindir = testdir = None
748        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
749            if dir is not None and os.path.exists(os.path.join(dir, path)):
750                testdir = dir
751                importdir = bindir = os.path.join(dir, path)
752        if not bindir:
753            raise error.TestError(testname + ': test does not exist')
754
755    subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
756    outputdir = os.path.join(job.resultdir, subdir)
757    if tag:
758        outputdir += '.' + tag
759
760    local_namespace['job'] = job
761    local_namespace['bindir'] = bindir
762    local_namespace['outputdir'] = outputdir
763
764    sys.path.insert(0, importdir)
765    try:
766        exec ('import %s' % modulename, local_namespace, global_namespace)
767        exec ("mytest = %s(job, bindir, outputdir)" % classname,
768              local_namespace, global_namespace)
769    finally:
770        sys.path.pop(0)
771
772    pwd = os.getcwd()
773    os.chdir(outputdir)
774
775    try:
776        mytest = global_namespace['mytest']
777        if before_test_hook:
778            before_test_hook(mytest)
779
780        # we use the register iteration hooks methods to register the passed
781        # in hooks
782        if before_iteration_hook:
783            mytest.register_before_iteration_hook(before_iteration_hook)
784        if after_iteration_hook:
785            mytest.register_after_iteration_hook(after_iteration_hook)
786        mytest._exec(args, dargs)
787    finally:
788        os.chdir(pwd)
789        if after_test_hook:
790            after_test_hook(mytest)
791        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
792