test.py revision 7f24f0bfe69b2f36ba74350fba9ad547ac55571c
1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19#pylint: disable-msg=C0111
20
21import fcntl, json, os, re, sys, shutil, tempfile, time, traceback
22import logging
23
24from autotest_lib.client.common_lib import error
25from autotest_lib.client.bin import utils
26
27
28class base_test(object):
29    preserve_srcdir = False
30    network_destabilizing = False
31
32    def __init__(self, job, bindir, outputdir):
33        self.job = job
34        self.pkgmgr = job.pkgmgr
35        self.autodir = job.autodir
36        self.outputdir = outputdir
37        self.tagged_testname = os.path.basename(self.outputdir)
38        self.resultsdir = os.path.join(self.outputdir, 'results')
39        os.mkdir(self.resultsdir)
40        self.profdir = os.path.join(self.outputdir, 'profiling')
41        os.mkdir(self.profdir)
42        self.debugdir = os.path.join(self.outputdir, 'debug')
43        os.mkdir(self.debugdir)
44        # TODO(ericli): figure out how autotest crash handler work with cros
45        # Once this is re-enabled import getpass. crosbug.com/31232
46        # crash handler, we should restore it in near term.
47        # if getpass.getuser() == 'root':
48        #     self.configure_crash_handler()
49        # else:
50        self.crash_handling_enabled = False
51        self.bindir = bindir
52        self.srcdir = os.path.join(self.bindir, 'src')
53        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
54                                       dir=job.tmpdir)
55        self._keyvals = []
56        self._new_keyval = False
57        self.failed_constraints = []
58        self.iteration = 0
59        self.before_iteration_hooks = []
60        self.after_iteration_hooks = []
61
62
63    def configure_crash_handler(self):
64        pass
65
66
67    def crash_handler_report(self):
68        pass
69
70
71    def assert_(self, expr, msg='Assertion failed.'):
72        if not expr:
73            raise error.TestError(msg)
74
75
76    def write_test_keyval(self, attr_dict):
77        utils.write_keyval(self.outputdir, attr_dict,
78                           tap_report=self.job._tap)
79
80    @staticmethod
81    def _append_type_to_keys(dictionary, typename):
82        new_dict = {}
83        for key, value in dictionary.iteritems():
84            new_key = "%s{%s}" % (key, typename)
85            new_dict[new_key] = value
86        return new_dict
87
88
89    def output_perf_value(self, description, value, units,
90                          higher_is_better=True, graph=None):
91        """
92        Records a measured performance value in an output file.
93
94        The output file will subsequently be parsed by the TKO parser to have
95        the information inserted into the results database.
96
97        @param description: A string describing the measured perf value. Must
98                be maximum length 256, and may only contain letters, numbers,
99                periods, dashes, and underscores.  For example:
100                "page_load_time", "scrolling-frame-rate".
101        @param value: A number representing the measured perf value, or a list
102                of measured values if a test takes multiple measurements.
103                Measured perf values can be either ints or floats.
104        @param units: A string describing the units associated with the
105                measured perf value. Must be maximum length 32, and may only
106                contain letters, numbers, periods, dashes, and underscores.
107                For example: "msec", "fps", "score", "runs_per_second".
108        @param higher_is_better: A boolean indicating whether or not a "higher"
109                measured perf value is considered to be better. If False, it is
110                assumed that a "lower" measured value is considered to be
111                better.
112        @param graph: A string indicating the name of the graph on which
113                      the perf value will be subsequently displayed on
114                      the chrome perf dashboard.
115                      This allows multiple metrics be grouped together
116                      on the same graphs. Defaults to None, indicating
117                      that the perf value should be displayed individually
118                      on a separate graph.
119
120        """
121        if len(description) > 256:
122            raise ValueError('The description must be at most 256 characters.')
123        if len(units) > 32:
124            raise ValueError('The units must be at most 32 characters.')
125        string_regex = re.compile(r'^[-\.\w]+$')
126        if (not string_regex.search(description) or
127            not string_regex.search(units)):
128            raise ValueError('Invalid description or units string. May only '
129                             'contain letters, numbers, periods, dashes, and '
130                             'underscores.')
131
132        entry = {
133            'description': description,
134            'value': value,
135            'units': units,
136            'higher_is_better': higher_is_better,
137            'graph': graph
138        }
139
140        output_path = os.path.join(self.resultsdir, 'perf_measurements')
141        with open(output_path, 'a') as fp:
142            fp.write(json.dumps(entry, sort_keys=True) + '\n')
143
144
145    def write_perf_keyval(self, perf_dict):
146        self.write_iteration_keyval({}, perf_dict,
147                                    tap_report=self.job._tap)
148
149
150    def write_attr_keyval(self, attr_dict):
151        self.write_iteration_keyval(attr_dict, {},
152                                    tap_report=self.job._tap)
153
154
155    def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None):
156        # append the dictionaries before they have the {perf} and {attr} added
157        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
158        self._new_keyval = True
159
160        if attr_dict:
161            attr_dict = self._append_type_to_keys(attr_dict, "attr")
162            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr",
163                               tap_report=tap_report)
164
165        if perf_dict:
166            perf_dict = self._append_type_to_keys(perf_dict, "perf")
167            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf",
168                               tap_report=tap_report)
169
170        keyval_path = os.path.join(self.resultsdir, "keyval")
171        print >> open(keyval_path, "a"), ""
172
173
174    def analyze_perf_constraints(self, constraints):
175        if not self._new_keyval:
176            return
177
178        # create a dict from the keyvals suitable as an environment for eval
179        keyval_env = self._keyvals[-1]['perf'].copy()
180        keyval_env['__builtins__'] = None
181        self._new_keyval = False
182        failures = []
183
184        # evaluate each constraint using the current keyvals
185        for constraint in constraints:
186            logging.info('___________________ constraint = %s', constraint)
187            logging.info('___________________ keyvals = %s', keyval_env)
188
189            try:
190                if not eval(constraint, keyval_env):
191                    failures.append('%s: constraint was not met' % constraint)
192            except:
193                failures.append('could not evaluate constraint: %s'
194                                % constraint)
195
196        # keep track of the errors for each iteration
197        self.failed_constraints.append(failures)
198
199
200    def process_failed_constraints(self):
201        msg = ''
202        for i, failures in enumerate(self.failed_constraints):
203            if failures:
204                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
205
206        if msg:
207            raise error.TestFail(msg)
208
209
210    def register_before_iteration_hook(self, iteration_hook):
211        """
212        This is how we expect test writers to register a before_iteration_hook.
213        This adds the method to the list of hooks which are executed
214        before each iteration.
215
216        @param iteration_hook: Method to run before each iteration. A valid
217                               hook accepts a single argument which is the
218                               test object.
219        """
220        self.before_iteration_hooks.append(iteration_hook)
221
222
223    def register_after_iteration_hook(self, iteration_hook):
224        """
225        This is how we expect test writers to register an after_iteration_hook.
226        This adds the method to the list of hooks which are executed
227        after each iteration.
228
229        @param iteration_hook: Method to run after each iteration. A valid
230                               hook accepts a single argument which is the
231                               test object.
232        """
233        self.after_iteration_hooks.append(iteration_hook)
234
235
236    def initialize(self):
237        pass
238
239
240    def setup(self):
241        pass
242
243
244    def warmup(self, *args, **dargs):
245        pass
246
247
248    def drop_caches_between_iterations(self):
249        if self.job.drop_caches_between_iterations:
250            utils.drop_caches()
251
252
253    def _call_run_once_with_retry(self, constraints, profile_only,
254                                  postprocess_profiled_run, args, dargs):
255        """Thin wrapper around _call_run_once that retries unsuccessful tests.
256
257        If the job object's attribute test_retry is > 0 retry any tests that
258        ran unsuccessfully X times.
259        *Note this does not competely re-initialize the test, it only
260            re-executes code once all the initial job set up (packages,
261            sysinfo, etc) is complete.
262        """
263        if self.job.test_retry != 0:
264            logging.info('Test will be retried a maximum of %d times',
265                         self.job.test_retry)
266
267        max_runs = self.job.test_retry
268        for retry_run in xrange(0, max_runs+1):
269            try:
270                self._call_run_once(constraints, profile_only,
271                                    postprocess_profiled_run, args, dargs)
272                break
273            except error.TestFailRetry as err:
274                if retry_run == max_runs:
275                    raise
276                self.job.record('INFO', None, None, 'Run %s failed with %s' % (
277                        retry_run, err))
278        if retry_run > 0:
279            self.write_test_keyval({'test_retries_before_success': retry_run})
280
281
282    def _call_run_once(self, constraints, profile_only,
283                       postprocess_profiled_run, args, dargs):
284        self.drop_caches_between_iterations()
285        # execute iteration hooks
286        for hook in self.before_iteration_hooks:
287            hook(self)
288
289        try:
290            if profile_only:
291                if not self.job.profilers.present():
292                    self.job.record('WARN', None, None,
293                                    'No profilers have been added but '
294                                    'profile_only is set - nothing '
295                                    'will be run')
296                self.run_once_profiling(postprocess_profiled_run,
297                                        *args, **dargs)
298            else:
299                self.before_run_once()
300                self.run_once(*args, **dargs)
301                self.after_run_once()
302
303            self.postprocess_iteration()
304            self.analyze_perf_constraints(constraints)
305        finally:
306            for hook in self.after_iteration_hooks:
307                hook(self)
308
309
310    def execute(self, iterations=None, test_length=None, profile_only=None,
311                _get_time=time.time, postprocess_profiled_run=None,
312                constraints=(), *args, **dargs):
313        """
314        This is the basic execute method for the tests inherited from base_test.
315        If you want to implement a benchmark test, it's better to implement
316        the run_once function, to cope with the profiling infrastructure. For
317        other tests, you can just override the default implementation.
318
319        @param test_length: The minimum test length in seconds. We'll run the
320            run_once function for a number of times large enough to cover the
321            minimum test length.
322
323        @param iterations: A number of iterations that we'll run the run_once
324            function. This parameter is incompatible with test_length and will
325            be silently ignored if you specify both.
326
327        @param profile_only: If true run X iterations with profilers enabled.
328            If false run X iterations and one with profiling if profiles are
329            enabled. If None, default to the value of job.default_profile_only.
330
331        @param _get_time: [time.time] Used for unit test time injection.
332
333        @param postprocess_profiled_run: Run the postprocessing for the
334            profiled run.
335        """
336
337        # For our special class of tests, the benchmarks, we don't want
338        # profilers to run during the test iterations. Let's reserve only
339        # the last iteration for profiling, if needed. So let's stop
340        # all profilers if they are present and active.
341        profilers = self.job.profilers
342        if profilers.active():
343            profilers.stop(self)
344        if profile_only is None:
345            profile_only = self.job.default_profile_only
346        # If the user called this test in an odd way (specified both iterations
347        # and test_length), let's warn them.
348        if iterations and test_length:
349            logging.debug('Iterations parameter ignored (timed execution)')
350        if test_length:
351            test_start = _get_time()
352            time_elapsed = 0
353            timed_counter = 0
354            logging.debug('Test started. Specified %d s as the minimum test '
355                          'length', test_length)
356            while time_elapsed < test_length:
357                timed_counter = timed_counter + 1
358                if time_elapsed == 0:
359                    logging.debug('Executing iteration %d', timed_counter)
360                elif time_elapsed > 0:
361                    logging.debug('Executing iteration %d, time_elapsed %d s',
362                                  timed_counter, time_elapsed)
363                self._call_run_once_with_retry(constraints, profile_only,
364                                               postprocess_profiled_run, args,
365                                               dargs)
366                test_iteration_finish = _get_time()
367                time_elapsed = test_iteration_finish - test_start
368            logging.debug('Test finished after %d iterations, '
369                          'time elapsed: %d s', timed_counter, time_elapsed)
370        else:
371            if iterations is None:
372                iterations = 1
373            if iterations > 1:
374                logging.debug('Test started. Specified %d iterations',
375                              iterations)
376            for self.iteration in xrange(1, iterations + 1):
377                if iterations > 1:
378                    logging.debug('Executing iteration %d of %d',
379                                  self.iteration, iterations)
380                self._call_run_once_with_retry(constraints, profile_only,
381                                               postprocess_profiled_run, args,
382                                               dargs)
383
384        if not profile_only:
385            self.iteration += 1
386            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
387
388        # Do any postprocessing, normally extracting performance keyvals, etc
389        self.postprocess()
390        self.process_failed_constraints()
391
392
393    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
394        profilers = self.job.profilers
395        # Do a profiling run if necessary
396        if profilers.present():
397            self.drop_caches_between_iterations()
398            profilers.before_start(self)
399
400            self.before_run_once()
401            profilers.start(self)
402            logging.debug('Profilers present. Profiling run started')
403
404            try:
405                self.run_once(*args, **dargs)
406
407                # Priority to the run_once() argument over the attribute.
408                postprocess_attribute = getattr(self,
409                                                'postprocess_profiled_run',
410                                                False)
411
412                if (postprocess_profiled_run or
413                    (postprocess_profiled_run is None and
414                     postprocess_attribute)):
415                    self.postprocess_iteration()
416
417            finally:
418                profilers.stop(self)
419                profilers.report(self)
420
421            self.after_run_once()
422
423
424    def postprocess(self):
425        pass
426
427
428    def postprocess_iteration(self):
429        pass
430
431
432    def cleanup(self):
433        pass
434
435
436    def before_run_once(self):
437        """
438        Override in tests that need it, will be called before any run_once()
439        call including the profiling run (when it's called before starting
440        the profilers).
441        """
442        pass
443
444
445    def after_run_once(self):
446        """
447        Called after every run_once (including from a profiled run when it's
448        called after stopping the profilers).
449        """
450        pass
451
452
453    def _exec(self, args, dargs):
454        self.job.logging.tee_redirect_debug_dir(self.debugdir,
455                                                log_name=self.tagged_testname)
456        try:
457            if self.network_destabilizing:
458                self.job.disable_warnings("NETWORK")
459
460            # write out the test attributes into a keyval
461            dargs   = dargs.copy()
462            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
463            keyvals = dargs.pop('test_attributes', {}).copy()
464            keyvals['version'] = self.version
465            for i, arg in enumerate(args):
466                keyvals['param-%d' % i] = repr(arg)
467            for name, arg in dargs.iteritems():
468                keyvals['param-%s' % name] = repr(arg)
469            self.write_test_keyval(keyvals)
470
471            _validate_args(args, dargs, self.initialize, self.setup,
472                           self.execute, self.cleanup)
473
474            try:
475                # Initialize:
476                _cherry_pick_call(self.initialize, *args, **dargs)
477
478                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
479                try:
480                    fcntl.flock(lockfile, fcntl.LOCK_EX)
481                    # Setup: (compile and install the test, if needed)
482                    p_args, p_dargs = _cherry_pick_args(self.setup,args,dargs)
483                    utils.update_version(self.srcdir, self.preserve_srcdir,
484                                         self.version, self.setup,
485                                         *p_args, **p_dargs)
486                finally:
487                    fcntl.flock(lockfile, fcntl.LOCK_UN)
488                    lockfile.close()
489
490                # Execute:
491                os.chdir(self.outputdir)
492
493                # call self.warmup cherry picking the arguments it accepts and
494                # translate exceptions if needed
495                _call_test_function(_cherry_pick_call, self.warmup,
496                                    *args, **dargs)
497
498                if hasattr(self, 'run_once'):
499                    p_args, p_dargs = _cherry_pick_args(self.run_once,
500                                                        args, dargs)
501                    # pull in any non-* and non-** args from self.execute
502                    for param in _get_nonstar_args(self.execute):
503                        if param in dargs:
504                            p_dargs[param] = dargs[param]
505                else:
506                    p_args, p_dargs = _cherry_pick_args(self.execute,
507                                                        args, dargs)
508
509                _call_test_function(self.execute, *p_args, **p_dargs)
510            except Exception:
511                # Save the exception while we run our cleanup() before
512                # reraising it.
513                exc_info = sys.exc_info()
514                try:
515                    try:
516                        if run_cleanup:
517                            _cherry_pick_call(self.cleanup, *args, **dargs)
518                    except Exception:
519                        logging.error('Ignoring exception during cleanup() phase:')
520                        traceback.print_exc()
521                        logging.error('Now raising the earlier %s error',
522                                      exc_info[0])
523                    self.crash_handler_report()
524                finally:
525                    self.job.logging.restore()
526                    try:
527                        raise exc_info[0], exc_info[1], exc_info[2]
528                    finally:
529                        # http://docs.python.org/library/sys.html#sys.exc_info
530                        # Be nice and prevent a circular reference.
531                        del exc_info
532            else:
533                try:
534                    if run_cleanup:
535                        _cherry_pick_call(self.cleanup, *args, **dargs)
536                    self.crash_handler_report()
537                finally:
538                    self.job.logging.restore()
539        except error.AutotestError:
540            if self.network_destabilizing:
541                self.job.enable_warnings("NETWORK")
542            # Pass already-categorized errors on up.
543            raise
544        except Exception, e:
545            if self.network_destabilizing:
546                self.job.enable_warnings("NETWORK")
547            # Anything else is an ERROR in our own code, not execute().
548            raise error.UnhandledTestError(e)
549        else:
550            if self.network_destabilizing:
551                self.job.enable_warnings("NETWORK")
552
553
554    def runsubtest(self, url, *args, **dargs):
555        """
556        Execute another autotest test from inside the current test's scope.
557
558        @param test: Parent test.
559        @param url: Url of new test.
560        @param tag: Tag added to test name.
561        @param args: Args for subtest.
562        @param dargs: Dictionary with args for subtest.
563        @iterations: Number of subtest iterations.
564        @profile_only: If true execute one profiled run.
565        """
566        dargs["profile_only"] = dargs.get("profile_only", False)
567        test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
568        return self.job.run_test(url, master_testpath=test_basepath,
569                                 *args, **dargs)
570
571
572def _get_nonstar_args(func):
573    """Extract all the (normal) function parameter names.
574
575    Given a function, returns a tuple of parameter names, specifically
576    excluding the * and ** parameters, if the function accepts them.
577
578    @param func: A callable that we want to chose arguments for.
579
580    @return: A tuple of parameters accepted by the function.
581    """
582    return func.func_code.co_varnames[:func.func_code.co_argcount]
583
584
585def _cherry_pick_args(func, args, dargs):
586    """Sanitize positional and keyword arguments before calling a function.
587
588    Given a callable (func), an argument tuple and a dictionary of keyword
589    arguments, pick only those arguments which the function is prepared to
590    accept and return a new argument tuple and keyword argument dictionary.
591
592    Args:
593      func: A callable that we want to choose arguments for.
594      args: A tuple of positional arguments to consider passing to func.
595      dargs: A dictionary of keyword arguments to consider passing to func.
596    Returns:
597      A tuple of: (args tuple, keyword arguments dictionary)
598    """
599    # Cherry pick args:
600    if func.func_code.co_flags & 0x04:
601        # func accepts *args, so return the entire args.
602        p_args = args
603    else:
604        p_args = ()
605
606    # Cherry pick dargs:
607    if func.func_code.co_flags & 0x08:
608        # func accepts **dargs, so return the entire dargs.
609        p_dargs = dargs
610    else:
611        # Only return the keyword arguments that func accepts.
612        p_dargs = {}
613        for param in _get_nonstar_args(func):
614            if param in dargs:
615                p_dargs[param] = dargs[param]
616
617    return p_args, p_dargs
618
619
620def _cherry_pick_call(func, *args, **dargs):
621    """Cherry picks arguments from args/dargs based on what "func" accepts
622    and calls the function with the picked arguments."""
623    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
624    return func(*p_args, **p_dargs)
625
626
627def _validate_args(args, dargs, *funcs):
628    """Verify that arguments are appropriate for at least one callable.
629
630    Given a list of callables as additional parameters, verify that
631    the proposed keyword arguments in dargs will each be accepted by at least
632    one of the callables.
633
634    NOTE: args is currently not supported and must be empty.
635
636    Args:
637      args: A tuple of proposed positional arguments.
638      dargs: A dictionary of proposed keyword arguments.
639      *funcs: Callables to be searched for acceptance of args and dargs.
640    Raises:
641      error.AutotestError: if an arg won't be accepted by any of *funcs.
642    """
643    all_co_flags = 0
644    all_varnames = ()
645    for func in funcs:
646        all_co_flags |= func.func_code.co_flags
647        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
648
649    # Check if given args belongs to at least one of the methods below.
650    if len(args) > 0:
651        # Current implementation doesn't allow the use of args.
652        raise error.TestError('Unnamed arguments not accepted. Please '
653                              'call job.run_test with named args only')
654
655    # Check if given dargs belongs to at least one of the methods below.
656    if len(dargs) > 0:
657        if not all_co_flags & 0x08:
658            # no func accepts *dargs, so:
659            for param in dargs:
660                if not param in all_varnames:
661                    raise error.AutotestError('Unknown parameter: %s' % param)
662
663
664def _installtest(job, url):
665    (group, name) = job.pkgmgr.get_package_name(url, 'test')
666
667    # Bail if the test is already installed
668    group_dir = os.path.join(job.testdir, "download", group)
669    if os.path.exists(os.path.join(group_dir, name)):
670        return (group, name)
671
672    # If the group directory is missing create it and add
673    # an empty  __init__.py so that sub-directories are
674    # considered for import.
675    if not os.path.exists(group_dir):
676        os.makedirs(group_dir)
677        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
678        f.close()
679
680    logging.debug("%s: installing test url=%s", name, url)
681    tarball = os.path.basename(url)
682    tarball_path = os.path.join(group_dir, tarball)
683    test_dir = os.path.join(group_dir, name)
684    job.pkgmgr.fetch_pkg(tarball, tarball_path,
685                         repo_url = os.path.dirname(url))
686
687    # Create the directory for the test
688    if not os.path.exists(test_dir):
689        os.mkdir(os.path.join(group_dir, name))
690
691    job.pkgmgr.untar_pkg(tarball_path, test_dir)
692
693    os.remove(tarball_path)
694
695    # For this 'sub-object' to be importable via the name
696    # 'group.name' we need to provide an __init__.py,
697    # so link the main entry point to this.
698    os.symlink(name + '.py', os.path.join(group_dir, name,
699                            '__init__.py'))
700
701    # The test is now installed.
702    return (group, name)
703
704
705def _call_test_function(func, *args, **dargs):
706    """Calls a test function and translates exceptions so that errors
707    inside test code are considered test failures."""
708    try:
709        return func(*args, **dargs)
710    except error.AutotestError:
711        raise
712    except Exception, e:
713        # Other exceptions must be treated as a FAIL when
714        # raised during the test functions
715        raise error.UnhandledTestFail(e)
716
717
718def runtest(job, url, tag, args, dargs,
719            local_namespace={}, global_namespace={},
720            before_test_hook=None, after_test_hook=None,
721            before_iteration_hook=None, after_iteration_hook=None):
722    local_namespace = local_namespace.copy()
723    global_namespace = global_namespace.copy()
724    # if this is not a plain test name then download and install the
725    # specified test
726    if url.endswith('.tar.bz2'):
727        (testgroup, testname) = _installtest(job, url)
728        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
729        importdir = os.path.join(job.testdir, 'download')
730        site_bindir = None
731        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
732        classname = '%s.%s' % (modulename, testname)
733        path = testname
734    else:
735        # If the test is local, it may be under either testdir or site_testdir.
736        # Tests in site_testdir override tests defined in testdir
737        testname = path = url
738        testgroup = ''
739        path = re.sub(':', '/', testname)
740        modulename = os.path.basename(path)
741        classname = '%s.%s' % (modulename, modulename)
742
743        # Try installing the test package
744        # The job object may be either a server side job or a client side job.
745        # 'install_pkg' method will be present only if it's a client side job.
746        if hasattr(job, 'install_pkg'):
747            try:
748                bindir = os.path.join(job.testdir, testname)
749                job.install_pkg(testname, 'test', bindir)
750            except error.PackageInstallError, e:
751                # continue as a fall back mechanism and see if the test code
752                # already exists on the machine
753                pass
754
755        bindir = testdir = None
756        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
757            if dir is not None and os.path.exists(os.path.join(dir, path)):
758                testdir = dir
759                importdir = bindir = os.path.join(dir, path)
760        if not bindir:
761            raise error.TestError(testname + ': test does not exist')
762
763    subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
764    outputdir = os.path.join(job.resultdir, subdir)
765    if tag:
766        outputdir += '.' + tag
767
768    local_namespace['job'] = job
769    local_namespace['bindir'] = bindir
770    local_namespace['outputdir'] = outputdir
771
772    sys.path.insert(0, importdir)
773    try:
774        exec ('import %s' % modulename, local_namespace, global_namespace)
775        exec ("mytest = %s(job, bindir, outputdir)" % classname,
776              local_namespace, global_namespace)
777    finally:
778        sys.path.pop(0)
779
780    pwd = os.getcwd()
781    os.chdir(outputdir)
782
783    try:
784        mytest = global_namespace['mytest']
785        if before_test_hook:
786            before_test_hook(mytest)
787
788        # we use the register iteration hooks methods to register the passed
789        # in hooks
790        if before_iteration_hook:
791            mytest.register_before_iteration_hook(before_iteration_hook)
792        if after_iteration_hook:
793            mytest.register_after_iteration_hook(after_iteration_hook)
794        mytest._exec(args, dargs)
795    finally:
796        os.chdir(pwd)
797        if after_test_hook:
798            after_test_hook(mytest)
799        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
800