1# Shell class for a test, inherited by all individual tests
2#
3# Methods:
4#       __init__        initialise
5#       initialize      run once for each job
6#       setup           run once for each new version of the test installed
7#       run             run the test (wrapped by job.run_test())
8#
9# Data:
10#       job             backreference to the job this test instance is part of
11#       outputdir       eg. results/<job>/<testname.tag>
12#       resultsdir      eg. results/<job>/<testname.tag>/results
13#       profdir         eg. results/<job>/<testname.tag>/profiling
14#       debugdir        eg. results/<job>/<testname.tag>/debug
15#       bindir          eg. tests/<test>
16#       src             eg. tests/<test>/src
17#       tmpdir          eg. tmp/<tempname>_<testname.tag>
18
19#pylint: disable=C0111
20
21import fcntl
22import json
23import logging
24import os
25import re
26import shutil
27import stat
28import sys
29import tempfile
30import time
31import traceback
32
33from autotest_lib.client.bin import utils
34from autotest_lib.client.common_lib import error
35from autotest_lib.client.common_lib import utils as client_utils
36
37try:
38    from chromite.lib import metrics
39except ImportError:
40    metrics = client_utils.metrics_mock
41
42
43class base_test(object):
44    preserve_srcdir = False
45
46    def __init__(self, job, bindir, outputdir):
47        self.job = job
48        self.pkgmgr = job.pkgmgr
49        self.autodir = job.autodir
50        self.outputdir = outputdir
51        self.tagged_testname = os.path.basename(self.outputdir)
52        self.resultsdir = os.path.join(self.outputdir, 'results')
53        os.mkdir(self.resultsdir)
54        self.profdir = os.path.join(self.outputdir, 'profiling')
55        os.mkdir(self.profdir)
56        self.debugdir = os.path.join(self.outputdir, 'debug')
57        os.mkdir(self.debugdir)
58        # TODO(ericli): figure out how autotest crash handler work with cros
59        # Once this is re-enabled import getpass. crosbug.com/31232
60        # crash handler, we should restore it in near term.
61        # if getpass.getuser() == 'root':
62        #     self.configure_crash_handler()
63        # else:
64        self.crash_handling_enabled = False
65        self.bindir = bindir
66        self.srcdir = os.path.join(self.bindir, 'src')
67        self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname,
68                                       dir=job.tmpdir)
69        self._keyvals = []
70        self._new_keyval = False
71        self.failed_constraints = []
72        self.iteration = 0
73        self.before_iteration_hooks = []
74        self.after_iteration_hooks = []
75
76        # Flag to indicate if the test has succeeded or failed.
77        self.success = False
78
79
80    def configure_crash_handler(self):
81        pass
82
83
84    def crash_handler_report(self):
85        pass
86
87
88    def assert_(self, expr, msg='Assertion failed.'):
89        if not expr:
90            raise error.TestError(msg)
91
92
93    def write_test_keyval(self, attr_dict):
94        utils.write_keyval(self.outputdir, attr_dict)
95
96
97    @staticmethod
98    def _append_type_to_keys(dictionary, typename):
99        new_dict = {}
100        for key, value in dictionary.iteritems():
101            new_key = "%s{%s}" % (key, typename)
102            new_dict[new_key] = value
103        return new_dict
104
105
106    def output_perf_value(self, description, value, units=None,
107                          higher_is_better=None, graph=None,
108                          replacement='_', replace_existing_values=False):
109        """
110        Records a measured performance value in an output file.
111
112        The output file will subsequently be parsed by the TKO parser to have
113        the information inserted into the results database.
114
115        @param description: A string describing the measured perf value. Must
116                be maximum length 256, and may only contain letters, numbers,
117                periods, dashes, and underscores.  For example:
118                "page_load_time", "scrolling-frame-rate".
119        @param value: A number representing the measured perf value, or a list
120                of measured values if a test takes multiple measurements.
121                Measured perf values can be either ints or floats.
122        @param units: A string describing the units associated with the
123                measured perf value. Must be maximum length 32, and may only
124                contain letters, numbers, periods, dashes, and underscores.
125                For example: "msec", "fps", "score", "runs_per_second".
126        @param higher_is_better: A boolean indicating whether or not a "higher"
127                measured perf value is considered to be better. If False, it is
128                assumed that a "lower" measured value is considered to be
129                better. This impacts dashboard plotting and email notification.
130                Pure autotests are expected to specify either True or False!
131                This value can be set to "None" to indicate that the perf
132                dashboard should apply the rules encoded via Chromium
133                unit-info.json. This is only used for tracking Chromium based
134                tests (in particular telemetry).
135        @param graph: A string indicating the name of the graph on which
136                the perf value will be subsequently displayed on the chrome perf
137                dashboard. This allows multiple metrics be grouped together on
138                the same graphs. Defaults to None, indicating that the perf
139                value should be displayed individually on a separate graph.
140        @param replacement: string to replace illegal characters in
141                |description| and |units| with.
142        @param replace_existing_values: A boolean indicating whether or not a
143                new added perf value should replace existing perf.
144        """
145        if len(description) > 256:
146            raise ValueError('The description must be at most 256 characters.')
147        if units and len(units) > 32:
148            raise ValueError('The units must be at most 32 characters.')
149
150        # If |replacement| is legal replace illegal characters with it.
151        string_regex = re.compile(r'[^-\.\w]')
152        if replacement is None or re.search(string_regex, replacement):
153            raise ValueError('Invalid replacement string to mask illegal '
154                             'characters. May only contain letters, numbers, '
155                             'periods, dashes, and underscores. '
156                             'replacement: %s' % replacement)
157        description = re.sub(string_regex, replacement, description)
158        units = re.sub(string_regex, replacement, units) if units else None
159
160        charts = {}
161        output_file = os.path.join(self.resultsdir, 'results-chart.json')
162        if os.path.isfile(output_file):
163            with open(output_file, 'r') as fp:
164                contents = fp.read()
165                if contents:
166                     charts = json.loads(contents)
167
168        if graph:
169            first_level = graph
170            second_level = description
171        else:
172            first_level = description
173            second_level = 'summary'
174
175        direction = 'up' if higher_is_better else 'down'
176
177        # All input should be a number - but at times there are strings
178        # representing numbers logged, attempt to convert them to numbers.
179        # If a non number string is logged an exception will be thrown.
180        if isinstance(value, list):
181          value = map(float, value)
182        else:
183          value = float(value)
184
185        result_type = 'scalar'
186        value_key = 'value'
187        result_value = value
188
189        # The chart json spec go/telemetry-json differenciates between a single
190        # value vs a list of values.  Lists of values get extra processing in
191        # the chromeperf dashboard ( mean, standard deviation etc)
192        # Tests can log one or more values for the same metric, to adhere stricly
193        # to the specification the first value logged is a scalar but if another
194        # value is logged the results become a list of scalar.
195        # TODO Figure out if there would be any difference of always using list
196        # of scalar even if there is just one item in the list.
197        if isinstance(value, list):
198            result_type = 'list_of_scalar_values'
199            value_key = 'values'
200            if first_level in charts and second_level in charts[first_level]:
201                if 'values' in charts[first_level][second_level]:
202                    result_value = charts[first_level][second_level]['values']
203                elif 'value' in charts[first_level][second_level]:
204                    result_value = [charts[first_level][second_level]['value']]
205                if replace_existing_values:
206                    result_value = value
207                else:
208                    result_value.extend(value)
209            else:
210                result_value = value
211        elif (first_level in charts and second_level in charts[first_level] and
212              not replace_existing_values):
213            result_type = 'list_of_scalar_values'
214            value_key = 'values'
215            if 'values' in charts[first_level][second_level]:
216                result_value = charts[first_level][second_level]['values']
217                result_value.append(value)
218            else:
219                result_value = [charts[first_level][second_level]['value'], value]
220
221        test_data = {
222            second_level: {
223                 'type': result_type,
224                 'units': units,
225                 value_key: result_value,
226                 'improvement_direction': direction
227           }
228        }
229
230        if first_level in charts:
231            charts[first_level].update(test_data)
232        else:
233            charts.update({first_level: test_data})
234
235        with open(output_file, 'w') as fp:
236            fp.write(json.dumps(charts, indent=2))
237
238
239    def write_perf_keyval(self, perf_dict):
240        self.write_iteration_keyval({}, perf_dict)
241
242
243    def write_attr_keyval(self, attr_dict):
244        self.write_iteration_keyval(attr_dict, {})
245
246
247    def write_iteration_keyval(self, attr_dict, perf_dict):
248        # append the dictionaries before they have the {perf} and {attr} added
249        self._keyvals.append({'attr':attr_dict, 'perf':perf_dict})
250        self._new_keyval = True
251
252        if attr_dict:
253            attr_dict = self._append_type_to_keys(attr_dict, "attr")
254            utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr")
255
256        if perf_dict:
257            perf_dict = self._append_type_to_keys(perf_dict, "perf")
258            utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf")
259
260        keyval_path = os.path.join(self.resultsdir, "keyval")
261        print >> open(keyval_path, "a"), ""
262
263
264    def analyze_perf_constraints(self, constraints):
265        if not self._new_keyval:
266            return
267
268        # create a dict from the keyvals suitable as an environment for eval
269        keyval_env = self._keyvals[-1]['perf'].copy()
270        keyval_env['__builtins__'] = None
271        self._new_keyval = False
272        failures = []
273
274        # evaluate each constraint using the current keyvals
275        for constraint in constraints:
276            logging.info('___________________ constraint = %s', constraint)
277            logging.info('___________________ keyvals = %s', keyval_env)
278
279            try:
280                if not eval(constraint, keyval_env):
281                    failures.append('%s: constraint was not met' % constraint)
282            except:
283                failures.append('could not evaluate constraint: %s'
284                                % constraint)
285
286        # keep track of the errors for each iteration
287        self.failed_constraints.append(failures)
288
289
290    def process_failed_constraints(self):
291        msg = ''
292        for i, failures in enumerate(self.failed_constraints):
293            if failures:
294                msg += 'iteration %d:%s  ' % (i, ','.join(failures))
295
296        if msg:
297            raise error.TestFail(msg)
298
299
300    def register_before_iteration_hook(self, iteration_hook):
301        """
302        This is how we expect test writers to register a before_iteration_hook.
303        This adds the method to the list of hooks which are executed
304        before each iteration.
305
306        @param iteration_hook: Method to run before each iteration. A valid
307                               hook accepts a single argument which is the
308                               test object.
309        """
310        self.before_iteration_hooks.append(iteration_hook)
311
312
313    def register_after_iteration_hook(self, iteration_hook):
314        """
315        This is how we expect test writers to register an after_iteration_hook.
316        This adds the method to the list of hooks which are executed
317        after each iteration. Hooks are executed starting with the most-
318        recently registered, in stack fashion.
319
320        @param iteration_hook: Method to run after each iteration. A valid
321                               hook accepts a single argument which is the
322                               test object.
323        """
324        self.after_iteration_hooks.append(iteration_hook)
325
326
327    def initialize(self):
328        pass
329
330
331    def setup(self):
332        pass
333
334
335    def warmup(self, *args, **dargs):
336        pass
337
338
339    def drop_caches_between_iterations(self):
340        if self.job.drop_caches_between_iterations:
341            utils.drop_caches()
342
343
344    def _call_run_once_with_retry(self, constraints, profile_only,
345                                  postprocess_profiled_run, args, dargs):
346        """Thin wrapper around _call_run_once that retries unsuccessful tests.
347
348        If the job object's attribute test_retry is > 0 retry any tests that
349        ran unsuccessfully X times.
350        *Note this does not competely re-initialize the test, it only
351            re-executes code once all the initial job set up (packages,
352            sysinfo, etc) is complete.
353        """
354        if self.job.test_retry != 0:
355            logging.info('Test will be retried a maximum of %d times',
356                         self.job.test_retry)
357
358        max_runs = self.job.test_retry
359        for retry_run in xrange(0, max_runs+1):
360            try:
361                self._call_run_once(constraints, profile_only,
362                                    postprocess_profiled_run, args, dargs)
363                break
364            except error.TestFailRetry as err:
365                if retry_run == max_runs:
366                    raise
367                self.job.record('INFO', None, None, 'Run %s failed with %s' % (
368                        retry_run, err))
369        if retry_run > 0:
370            self.write_test_keyval({'test_retries_before_success': retry_run})
371
372
373    def _call_run_once(self, constraints, profile_only,
374                       postprocess_profiled_run, args, dargs):
375        self.drop_caches_between_iterations()
376        # execute iteration hooks
377        if not self.job.fast:
378            logging.debug('Starting before_iteration_hooks for %s',
379                          self.tagged_testname)
380            with metrics.SecondsTimer(
381                    'chromeos/autotest/job/before_iteration_hook_duration'):
382                for hook in self.before_iteration_hooks:
383                    hook(self)
384            logging.debug('before_iteration_hooks completed')
385
386        finished = False
387        try:
388            if profile_only:
389                if not self.job.profilers.present():
390                    self.job.record('WARN', None, None,
391                                    'No profilers have been added but '
392                                    'profile_only is set - nothing '
393                                    'will be run')
394                self.run_once_profiling(postprocess_profiled_run,
395                                        *args, **dargs)
396            else:
397                self.before_run_once()
398                logging.debug('starting test(run_once()), test details follow'
399                              '\n%r', args)
400                self.run_once(*args, **dargs)
401                logging.debug('The test has completed successfully')
402                self.after_run_once()
403
404            self.postprocess_iteration()
405            self.analyze_perf_constraints(constraints)
406            finished = True
407        # Catch and re-raise to let after_iteration_hooks see the exception.
408        except Exception as e:
409            logging.debug('Test failed due to %s. Exception log follows the '
410                          'after_iteration_hooks.', str(e))
411            raise
412        finally:
413            if not finished or not self.job.fast:
414                logging.debug('Starting after_iteration_hooks for %s',
415                              self.tagged_testname)
416                with metrics.SecondsTimer(
417                        'chromeos/autotest/job/after_iteration_hook_duration'):
418                    for hook in reversed(self.after_iteration_hooks):
419                        hook(self)
420                logging.debug('after_iteration_hooks completed')
421
422
423    def execute(self, iterations=None, test_length=None, profile_only=None,
424                _get_time=time.time, postprocess_profiled_run=None,
425                constraints=(), *args, **dargs):
426        """
427        This is the basic execute method for the tests inherited from base_test.
428        If you want to implement a benchmark test, it's better to implement
429        the run_once function, to cope with the profiling infrastructure. For
430        other tests, you can just override the default implementation.
431
432        @param test_length: The minimum test length in seconds. We'll run the
433            run_once function for a number of times large enough to cover the
434            minimum test length.
435
436        @param iterations: A number of iterations that we'll run the run_once
437            function. This parameter is incompatible with test_length and will
438            be silently ignored if you specify both.
439
440        @param profile_only: If true run X iterations with profilers enabled.
441            If false run X iterations and one with profiling if profiles are
442            enabled. If None, default to the value of job.default_profile_only.
443
444        @param _get_time: [time.time] Used for unit test time injection.
445
446        @param postprocess_profiled_run: Run the postprocessing for the
447            profiled run.
448        """
449
450        # For our special class of tests, the benchmarks, we don't want
451        # profilers to run during the test iterations. Let's reserve only
452        # the last iteration for profiling, if needed. So let's stop
453        # all profilers if they are present and active.
454        profilers = self.job.profilers
455        if profilers.active():
456            profilers.stop(self)
457        if profile_only is None:
458            profile_only = self.job.default_profile_only
459        # If the user called this test in an odd way (specified both iterations
460        # and test_length), let's warn them.
461        if iterations and test_length:
462            logging.debug('Iterations parameter ignored (timed execution)')
463        if test_length:
464            test_start = _get_time()
465            time_elapsed = 0
466            timed_counter = 0
467            logging.debug('Test started. Specified %d s as the minimum test '
468                          'length', test_length)
469            while time_elapsed < test_length:
470                timed_counter = timed_counter + 1
471                if time_elapsed == 0:
472                    logging.debug('Executing iteration %d', timed_counter)
473                elif time_elapsed > 0:
474                    logging.debug('Executing iteration %d, time_elapsed %d s',
475                                  timed_counter, time_elapsed)
476                self._call_run_once_with_retry(constraints, profile_only,
477                                               postprocess_profiled_run, args,
478                                               dargs)
479                test_iteration_finish = _get_time()
480                time_elapsed = test_iteration_finish - test_start
481            logging.debug('Test finished after %d iterations, '
482                          'time elapsed: %d s', timed_counter, time_elapsed)
483        else:
484            if iterations is None:
485                iterations = 1
486            if iterations > 1:
487                logging.debug('Test started. Specified %d iterations',
488                              iterations)
489            for self.iteration in xrange(1, iterations + 1):
490                if iterations > 1:
491                    logging.debug('Executing iteration %d of %d',
492                                  self.iteration, iterations)
493                self._call_run_once_with_retry(constraints, profile_only,
494                                               postprocess_profiled_run, args,
495                                               dargs)
496
497        if not profile_only:
498            self.iteration += 1
499            self.run_once_profiling(postprocess_profiled_run, *args, **dargs)
500
501        # Do any postprocessing, normally extracting performance keyvals, etc
502        self.postprocess()
503        self.process_failed_constraints()
504
505
506    def run_once_profiling(self, postprocess_profiled_run, *args, **dargs):
507        profilers = self.job.profilers
508        # Do a profiling run if necessary
509        if profilers.present():
510            self.drop_caches_between_iterations()
511            profilers.before_start(self)
512
513            self.before_run_once()
514            profilers.start(self)
515            logging.debug('Profilers present. Profiling run started')
516
517            try:
518                self.run_once(*args, **dargs)
519
520                # Priority to the run_once() argument over the attribute.
521                postprocess_attribute = getattr(self,
522                                                'postprocess_profiled_run',
523                                                False)
524
525                if (postprocess_profiled_run or
526                    (postprocess_profiled_run is None and
527                     postprocess_attribute)):
528                    self.postprocess_iteration()
529
530            finally:
531                profilers.stop(self)
532                profilers.report(self)
533
534            self.after_run_once()
535
536
537    def postprocess(self):
538        pass
539
540
541    def postprocess_iteration(self):
542        pass
543
544
545    def cleanup(self):
546        pass
547
548
549    def before_run_once(self):
550        """
551        Override in tests that need it, will be called before any run_once()
552        call including the profiling run (when it's called before starting
553        the profilers).
554        """
555        pass
556
557
558    def after_run_once(self):
559        """
560        Called after every run_once (including from a profiled run when it's
561        called after stopping the profilers).
562        """
563        pass
564
565
566    @staticmethod
567    def _make_writable_to_others(directory):
568        mode = os.stat(directory).st_mode
569        mode = mode | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
570        os.chmod(directory, mode)
571
572
573    def _exec(self, args, dargs):
574        self.job.logging.tee_redirect_debug_dir(self.debugdir,
575                                                log_name=self.tagged_testname)
576        try:
577            # write out the test attributes into a keyval
578            dargs   = dargs.copy()
579            run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup)
580            keyvals = dargs.pop('test_attributes', {}).copy()
581            keyvals['version'] = self.version
582            for i, arg in enumerate(args):
583                keyvals['param-%d' % i] = repr(arg)
584            for name, arg in dargs.iteritems():
585                keyvals['param-%s' % name] = repr(arg)
586            self.write_test_keyval(keyvals)
587
588            _validate_args(args, dargs, self.initialize, self.setup,
589                           self.execute, self.cleanup)
590
591            try:
592                # Make resultsdir and tmpdir accessible to everyone. We may
593                # output data to these directories as others, e.g., chronos.
594                self._make_writable_to_others(self.tmpdir)
595                self._make_writable_to_others(self.resultsdir)
596
597                # Initialize:
598                _cherry_pick_call(self.initialize, *args, **dargs)
599
600                lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w')
601                try:
602                    fcntl.flock(lockfile, fcntl.LOCK_EX)
603                    # Setup: (compile and install the test, if needed)
604                    p_args, p_dargs = _cherry_pick_args(self.setup, args, dargs)
605                    utils.update_version(self.srcdir, self.preserve_srcdir,
606                                         self.version, self.setup,
607                                         *p_args, **p_dargs)
608                finally:
609                    fcntl.flock(lockfile, fcntl.LOCK_UN)
610                    lockfile.close()
611
612                # Execute:
613                os.chdir(self.outputdir)
614
615                # call self.warmup cherry picking the arguments it accepts and
616                # translate exceptions if needed
617                _call_test_function(_cherry_pick_call, self.warmup,
618                                    *args, **dargs)
619
620                if hasattr(self, 'run_once'):
621                    p_args, p_dargs = _cherry_pick_args(self.run_once,
622                                                        args, dargs)
623                    # pull in any non-* and non-** args from self.execute
624                    for param in _get_nonstar_args(self.execute):
625                        if param in dargs:
626                            p_dargs[param] = dargs[param]
627                else:
628                    p_args, p_dargs = _cherry_pick_args(self.execute,
629                                                        args, dargs)
630
631                _call_test_function(self.execute, *p_args, **p_dargs)
632            except Exception:
633                # Save the exception while we run our cleanup() before
634                # reraising it, but log it to so actual time of error is known.
635                exc_info = sys.exc_info()
636                logging.warning('The test failed with the following exception',
637                                exc_info=True)
638
639                try:
640                    try:
641                        if run_cleanup:
642                            logging.debug('Running cleanup for test.')
643                            _cherry_pick_call(self.cleanup, *args, **dargs)
644                    except Exception:
645                        logging.error('Ignoring exception during cleanup() '
646                                      'phase:')
647                        traceback.print_exc()
648                        logging.error('Now raising the earlier %s error',
649                                      exc_info[0])
650                    self.crash_handler_report()
651                finally:
652                    # Raise exception after running cleanup, reporting crash,
653                    # and restoring job's logging, even if the first two
654                    # actions fail.
655                    self.job.logging.restore()
656                    try:
657                        raise exc_info[0], exc_info[1], exc_info[2]
658                    finally:
659                        # http://docs.python.org/library/sys.html#sys.exc_info
660                        # Be nice and prevent a circular reference.
661                        del exc_info
662            else:
663                try:
664                    if run_cleanup:
665                        _cherry_pick_call(self.cleanup, *args, **dargs)
666                    self.crash_handler_report()
667                finally:
668                    self.job.logging.restore()
669        except error.AutotestError:
670            # Pass already-categorized errors on up.
671            raise
672        except Exception, e:
673            # Anything else is an ERROR in our own code, not execute().
674            raise error.UnhandledTestError(e)
675
676    def runsubtest(self, url, *args, **dargs):
677        """
678        Execute another autotest test from inside the current test's scope.
679
680        @param test: Parent test.
681        @param url: Url of new test.
682        @param tag: Tag added to test name.
683        @param args: Args for subtest.
684        @param dargs: Dictionary with args for subtest.
685        @iterations: Number of subtest iterations.
686        @profile_only: If true execute one profiled run.
687        """
688        dargs["profile_only"] = dargs.get("profile_only", False)
689        test_basepath = self.outputdir[len(self.job.resultdir + "/"):]
690        return self.job.run_test(url, master_testpath=test_basepath,
691                                 *args, **dargs)
692
693
694def _get_nonstar_args(func):
695    """Extract all the (normal) function parameter names.
696
697    Given a function, returns a tuple of parameter names, specifically
698    excluding the * and ** parameters, if the function accepts them.
699
700    @param func: A callable that we want to chose arguments for.
701
702    @return: A tuple of parameters accepted by the function.
703    """
704    return func.func_code.co_varnames[:func.func_code.co_argcount]
705
706
707def _cherry_pick_args(func, args, dargs):
708    """Sanitize positional and keyword arguments before calling a function.
709
710    Given a callable (func), an argument tuple and a dictionary of keyword
711    arguments, pick only those arguments which the function is prepared to
712    accept and return a new argument tuple and keyword argument dictionary.
713
714    Args:
715      func: A callable that we want to choose arguments for.
716      args: A tuple of positional arguments to consider passing to func.
717      dargs: A dictionary of keyword arguments to consider passing to func.
718    Returns:
719      A tuple of: (args tuple, keyword arguments dictionary)
720    """
721    # Cherry pick args:
722    if func.func_code.co_flags & 0x04:
723        # func accepts *args, so return the entire args.
724        p_args = args
725    else:
726        p_args = ()
727
728    # Cherry pick dargs:
729    if func.func_code.co_flags & 0x08:
730        # func accepts **dargs, so return the entire dargs.
731        p_dargs = dargs
732    else:
733        # Only return the keyword arguments that func accepts.
734        p_dargs = {}
735        for param in _get_nonstar_args(func):
736            if param in dargs:
737                p_dargs[param] = dargs[param]
738
739    return p_args, p_dargs
740
741
742def _cherry_pick_call(func, *args, **dargs):
743    """Cherry picks arguments from args/dargs based on what "func" accepts
744    and calls the function with the picked arguments."""
745    p_args, p_dargs = _cherry_pick_args(func, args, dargs)
746    return func(*p_args, **p_dargs)
747
748
749def _validate_args(args, dargs, *funcs):
750    """Verify that arguments are appropriate for at least one callable.
751
752    Given a list of callables as additional parameters, verify that
753    the proposed keyword arguments in dargs will each be accepted by at least
754    one of the callables.
755
756    NOTE: args is currently not supported and must be empty.
757
758    Args:
759      args: A tuple of proposed positional arguments.
760      dargs: A dictionary of proposed keyword arguments.
761      *funcs: Callables to be searched for acceptance of args and dargs.
762    Raises:
763      error.AutotestError: if an arg won't be accepted by any of *funcs.
764    """
765    all_co_flags = 0
766    all_varnames = ()
767    for func in funcs:
768        all_co_flags |= func.func_code.co_flags
769        all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount]
770
771    # Check if given args belongs to at least one of the methods below.
772    if len(args) > 0:
773        # Current implementation doesn't allow the use of args.
774        raise error.TestError('Unnamed arguments not accepted. Please '
775                              'call job.run_test with named args only')
776
777    # Check if given dargs belongs to at least one of the methods below.
778    if len(dargs) > 0:
779        if not all_co_flags & 0x08:
780            # no func accepts *dargs, so:
781            for param in dargs:
782                if not param in all_varnames:
783                    raise error.AutotestError('Unknown parameter: %s' % param)
784
785
786def _installtest(job, url):
787    (group, name) = job.pkgmgr.get_package_name(url, 'test')
788
789    # Bail if the test is already installed
790    group_dir = os.path.join(job.testdir, "download", group)
791    if os.path.exists(os.path.join(group_dir, name)):
792        return (group, name)
793
794    # If the group directory is missing create it and add
795    # an empty  __init__.py so that sub-directories are
796    # considered for import.
797    if not os.path.exists(group_dir):
798        os.makedirs(group_dir)
799        f = file(os.path.join(group_dir, '__init__.py'), 'w+')
800        f.close()
801
802    logging.debug("%s: installing test url=%s", name, url)
803    tarball = os.path.basename(url)
804    tarball_path = os.path.join(group_dir, tarball)
805    test_dir = os.path.join(group_dir, name)
806    job.pkgmgr.fetch_pkg(tarball, tarball_path,
807                         repo_url = os.path.dirname(url))
808
809    # Create the directory for the test
810    if not os.path.exists(test_dir):
811        os.mkdir(os.path.join(group_dir, name))
812
813    job.pkgmgr.untar_pkg(tarball_path, test_dir)
814
815    os.remove(tarball_path)
816
817    # For this 'sub-object' to be importable via the name
818    # 'group.name' we need to provide an __init__.py,
819    # so link the main entry point to this.
820    os.symlink(name + '.py', os.path.join(group_dir, name,
821                            '__init__.py'))
822
823    # The test is now installed.
824    return (group, name)
825
826
827def _call_test_function(func, *args, **dargs):
828    """Calls a test function and translates exceptions so that errors
829    inside test code are considered test failures."""
830    try:
831        return func(*args, **dargs)
832    except error.AutotestError:
833        raise
834    except Exception, e:
835        # Other exceptions must be treated as a FAIL when
836        # raised during the test functions
837        raise error.UnhandledTestFail(e)
838
839
840def runtest(job, url, tag, args, dargs,
841            local_namespace={}, global_namespace={},
842            before_test_hook=None, after_test_hook=None,
843            before_iteration_hook=None, after_iteration_hook=None):
844    local_namespace = local_namespace.copy()
845    global_namespace = global_namespace.copy()
846    # if this is not a plain test name then download and install the
847    # specified test
848    if url.endswith('.tar.bz2'):
849        (testgroup, testname) = _installtest(job, url)
850        bindir = os.path.join(job.testdir, 'download', testgroup, testname)
851        importdir = os.path.join(job.testdir, 'download')
852        modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname)
853        classname = '%s.%s' % (modulename, testname)
854        path = testname
855    else:
856        # If the test is local, it may be under either testdir or site_testdir.
857        # Tests in site_testdir override tests defined in testdir
858        testname = path = url
859        testgroup = ''
860        path = re.sub(':', '/', testname)
861        modulename = os.path.basename(path)
862        classname = '%s.%s' % (modulename, modulename)
863
864        # Try installing the test package
865        # The job object may be either a server side job or a client side job.
866        # 'install_pkg' method will be present only if it's a client side job.
867        if hasattr(job, 'install_pkg'):
868            try:
869                bindir = os.path.join(job.testdir, testname)
870                job.install_pkg(testname, 'test', bindir)
871            except error.PackageInstallError:
872                # continue as a fall back mechanism and see if the test code
873                # already exists on the machine
874                pass
875
876        bindir = None
877        for dir in [job.testdir, getattr(job, 'site_testdir', None)]:
878            if dir is not None and os.path.exists(os.path.join(dir, path)):
879                importdir = bindir = os.path.join(dir, path)
880        if not bindir:
881            raise error.TestError(testname + ': test does not exist')
882
883    subdir = os.path.join(dargs.pop('master_testpath', ""), testname)
884    outputdir = os.path.join(job.resultdir, subdir)
885    if tag:
886        outputdir += '.' + tag
887
888    local_namespace['job'] = job
889    local_namespace['bindir'] = bindir
890    local_namespace['outputdir'] = outputdir
891
892    sys.path.insert(0, importdir)
893    try:
894        exec ('import %s' % modulename, local_namespace, global_namespace)
895        exec ("mytest = %s(job, bindir, outputdir)" % classname,
896              local_namespace, global_namespace)
897    finally:
898        sys.path.pop(0)
899
900    pwd = os.getcwd()
901    os.chdir(outputdir)
902
903    try:
904        mytest = global_namespace['mytest']
905        mytest.success = False
906        if not job.fast and before_test_hook:
907            logging.info('Starting before_hook for %s', mytest.tagged_testname)
908            with metrics.SecondsTimer(
909                    'chromeos/autotest/job/before_hook_duration'):
910                before_test_hook(mytest)
911            logging.info('before_hook completed')
912
913        # we use the register iteration hooks methods to register the passed
914        # in hooks
915        if before_iteration_hook:
916            mytest.register_before_iteration_hook(before_iteration_hook)
917        if after_iteration_hook:
918            mytest.register_after_iteration_hook(after_iteration_hook)
919        mytest._exec(args, dargs)
920        mytest.success = True
921    finally:
922        os.chdir(pwd)
923        if after_test_hook and (not mytest.success or not job.fast):
924            logging.info('Starting after_hook for %s', mytest.tagged_testname)
925            with metrics.SecondsTimer(
926                    'chromeos/autotest/job/after_hook_duration'):
927                after_test_hook(mytest)
928            logging.info('after_hook completed')
929
930        shutil.rmtree(mytest.tmpdir, ignore_errors=True)
931