1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, re, shutil, sys, time, traceback, types, glob
9import logging, getpass, weakref
10from autotest_lib.client.bin import client_logging_config
11from autotest_lib.client.bin import utils, parallel, kernel, xen
12from autotest_lib.client.bin import profilers, boottool, harness
13from autotest_lib.client.bin import config, sysinfo, test, local_host
14from autotest_lib.client.bin import partition as partition_lib
15from autotest_lib.client.common_lib import base_job
16from autotest_lib.client.common_lib import error, barrier, logging_manager
17from autotest_lib.client.common_lib import base_packages, packages
18from autotest_lib.client.common_lib import global_config
19from autotest_lib.client.tools import html_report
20
21GLOBAL_CONFIG = global_config.global_config
22
23LAST_BOOT_TAG = object()
24JOB_PREAMBLE = """
25from autotest_lib.client.common_lib.error import *
26from autotest_lib.client.bin.utils import *
27"""
28
29
30class StepError(error.AutotestError):
31    pass
32
33class NotAvailableError(error.AutotestError):
34    pass
35
36
37
38def _run_test_complete_on_exit(f):
39    """Decorator for job methods that automatically calls
40    self.harness.run_test_complete when the method exits, if appropriate."""
41    def wrapped(self, *args, **dargs):
42        try:
43            return f(self, *args, **dargs)
44        finally:
45            if self._logger.global_filename == 'status':
46                self.harness.run_test_complete()
47                if self.drop_caches:
48                    utils.drop_caches()
49    wrapped.__name__ = f.__name__
50    wrapped.__doc__ = f.__doc__
51    wrapped.__dict__.update(f.__dict__)
52    return wrapped
53
54
55class status_indenter(base_job.status_indenter):
56    """Provide a status indenter that is backed by job._record_prefix."""
57    def __init__(self, job):
58        self.job = weakref.proxy(job)  # avoid a circular reference
59
60
61    @property
62    def indent(self):
63        return self.job._record_indent
64
65
66    def increment(self):
67        self.job._record_indent += 1
68
69
70    def decrement(self):
71        self.job._record_indent -= 1
72
73
74class base_client_job(base_job.base_job):
75    """The client-side concrete implementation of base_job.
76
77    Optional properties provided by this implementation:
78        control
79        bootloader
80        harness
81    """
82
83    _WARNING_DISABLE_DELAY = 5
84
85    # _record_indent is a persistent property, but only on the client
86    _job_state = base_job.base_job._job_state
87    _record_indent = _job_state.property_factory(
88        '_state', '_record_indent', 0, namespace='client')
89    _max_disk_usage_rate = _job_state.property_factory(
90        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
91
92
93    def __init__(self, control, options, drop_caches=True,
94                 extra_copy_cmdline=None):
95        """
96        Prepare a client side job object.
97
98        @param control: The control file (pathname of).
99        @param options: an object which includes:
100                jobtag: The job tag string (eg "default").
101                cont: If this is the continuation of this job.
102                harness_type: An alternative server harness.  [None]
103                use_external_logging: If true, the enable_external_logging
104                          method will be called during construction.  [False]
105        @param drop_caches: If true, utils.drop_caches() is called before and
106                between all tests.  [True]
107        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
108                copy from the running kernel to all the installed kernels with
109                this job
110        """
111        super(base_client_job, self).__init__(options=options)
112        self._pre_record_init(control, options)
113        try:
114            self._post_record_init(control, options, drop_caches,
115                                   extra_copy_cmdline)
116        except Exception, err:
117            self.record(
118                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
119                    str(err))
120            raise
121
122
123    @classmethod
124    def _get_environ_autodir(cls):
125        return os.environ['AUTODIR']
126
127
128    @classmethod
129    def _find_base_directories(cls):
130        """
131        Determine locations of autodir and clientdir (which are the same)
132        using os.environ. Serverdir does not exist in this context.
133        """
134        autodir = clientdir = cls._get_environ_autodir()
135        return autodir, clientdir, None
136
137
138    @classmethod
139    def _parse_args(cls, args):
140        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
141
142
143    def _find_resultdir(self, options):
144        """
145        Determine the directory for storing results. On a client this is
146        always <autodir>/results/<tag>, where tag is passed in on the command
147        line as an option.
148        """
149        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',
150                                                           'output_dir',
151                                                            default="")
152        if options.output_dir:
153            basedir = options.output_dir
154        elif output_dir_config:
155            basedir = output_dir_config
156        else:
157            basedir = self.autodir
158
159        return os.path.join(basedir, 'results', options.tag)
160
161
162    def _get_status_logger(self):
163        """Return a reference to the status logger."""
164        return self._logger
165
166
167    def _pre_record_init(self, control, options):
168        """
169        Initialization function that should peform ONLY the required
170        setup so that the self.record() method works.
171
172        As of now self.record() needs self.resultdir, self._group_level,
173        self.harness and of course self._logger.
174        """
175        if not options.cont:
176            self._cleanup_debugdir_files()
177            self._cleanup_results_dir()
178
179        logging_manager.configure_logging(
180            client_logging_config.ClientLoggingConfig(),
181            results_dir=self.resultdir,
182            verbose=options.verbose)
183        logging.info('Writing results to %s', self.resultdir)
184
185        # init_group_level needs the state
186        self.control = os.path.realpath(control)
187        self._is_continuation = options.cont
188        self._current_step_ancestry = []
189        self._next_step_index = 0
190        self._load_state()
191
192        _harness = self.handle_persistent_option(options, 'harness')
193        _harness_args = self.handle_persistent_option(options, 'harness_args')
194
195        self.harness = harness.select(_harness, self, _harness_args)
196
197        # set up the status logger
198        def client_job_record_hook(entry):
199            msg_tag = ''
200            if '.' in self._logger.global_filename:
201                msg_tag = self._logger.global_filename.split('.', 1)[1]
202            # send the entry to the job harness
203            message = '\n'.join([entry.message] + entry.extra_message_lines)
204            rendered_entry = self._logger.render_entry(entry)
205            self.harness.test_status_detail(entry.status_code, entry.subdir,
206                                            entry.operation, message, msg_tag,
207                                            entry.fields)
208            self.harness.test_status(rendered_entry, msg_tag)
209            # send the entry to stdout, if it's enabled
210            logging.info(rendered_entry)
211        self._logger = base_job.status_logger(
212            self, status_indenter(self), record_hook=client_job_record_hook,
213            tap_writer=self._tap)
214
215    def _post_record_init(self, control, options, drop_caches,
216                          extra_copy_cmdline):
217        """
218        Perform job initialization not required by self.record().
219        """
220        self._init_drop_caches(drop_caches)
221
222        self._init_packages()
223
224        self.sysinfo = sysinfo.sysinfo(self.resultdir)
225        self._load_sysinfo_state()
226
227        if not options.cont:
228            download = os.path.join(self.testdir, 'download')
229            if not os.path.exists(download):
230                os.mkdir(download)
231
232            shutil.copyfile(self.control,
233                            os.path.join(self.resultdir, 'control'))
234
235        self.control = control
236
237        self.logging = logging_manager.get_logging_manager(
238                manage_stdout_and_stderr=True, redirect_fds=True)
239        self.logging.start_logging()
240
241        self._config = config.config(self)
242        self.profilers = profilers.profilers(self)
243
244        self._init_bootloader()
245
246        self.machines = [options.hostname]
247        self.machine_dict_list = [{'hostname' : options.hostname}]
248        # Client side tests should always run the same whether or not they are
249        # running in the lab.
250        self.in_lab = False
251        self.hosts = set([local_host.LocalHost(hostname=options.hostname,
252                                               bootloader=self.bootloader)])
253
254        self.args = []
255        if options.args:
256            self.args = self._parse_args(options.args)
257
258        if options.user:
259            self.user = options.user
260        else:
261            self.user = getpass.getuser()
262
263        self.sysinfo.log_per_reboot_data()
264
265        if not options.cont:
266            self.record('START', None, None)
267
268        self.harness.run_start()
269
270        if options.log:
271            self.enable_external_logging()
272
273        self._init_cmdline(extra_copy_cmdline)
274
275        self.num_tests_run = None
276        self.num_tests_failed = None
277
278        self.warning_loggers = None
279        self.warning_manager = None
280
281
282    def _init_drop_caches(self, drop_caches):
283        """
284        Perform the drop caches initialization.
285        """
286        self.drop_caches_between_iterations = (
287                                    GLOBAL_CONFIG.get_config_value('CLIENT',
288                                    'drop_caches_between_iterations',
289                                    type=bool, default=True))
290        self.drop_caches = drop_caches
291        if self.drop_caches:
292            utils.drop_caches()
293
294
295    def _init_bootloader(self):
296        """
297        Perform boottool initialization.
298        """
299        tool = self.config_get('boottool.executable')
300        self.bootloader = boottool.boottool(tool)
301
302
303    def _init_packages(self):
304        """
305        Perform the packages support initialization.
306        """
307        self.pkgmgr = packages.PackageManager(
308            self.autodir, run_function_dargs={'timeout':3600})
309
310
311    def _init_cmdline(self, extra_copy_cmdline):
312        """
313        Initialize default cmdline for booted kernels in this job.
314        """
315        copy_cmdline = set(['console'])
316        if extra_copy_cmdline is not None:
317            copy_cmdline.update(extra_copy_cmdline)
318
319        # extract console= and other args from cmdline and add them into the
320        # base args that we use for all kernels we install
321        cmdline = utils.read_one_line('/proc/cmdline')
322        kernel_args = []
323        for karg in cmdline.split():
324            for param in copy_cmdline:
325                if karg.startswith(param) and \
326                    (len(param) == len(karg) or karg[len(param)] == '='):
327                    kernel_args.append(karg)
328        self.config_set('boot.default_args', ' '.join(kernel_args))
329
330
331    def _cleanup_results_dir(self):
332        """Delete everything in resultsdir"""
333        assert os.path.exists(self.resultdir)
334        list_files = glob.glob('%s/*' % self.resultdir)
335        for f in list_files:
336            if os.path.isdir(f):
337                shutil.rmtree(f)
338            elif os.path.isfile(f):
339                os.remove(f)
340
341
342    def _cleanup_debugdir_files(self):
343        """
344        Delete any leftover debugdir files
345        """
346        list_files = glob.glob("/tmp/autotest_results_dir.*")
347        for f in list_files:
348            os.remove(f)
349
350
351    def disable_warnings(self, warning_type):
352        self.record("INFO", None, None,
353                    "disabling %s warnings" % warning_type,
354                    {"warnings.disable": warning_type})
355        time.sleep(self._WARNING_DISABLE_DELAY)
356
357
358    def enable_warnings(self, warning_type):
359        time.sleep(self._WARNING_DISABLE_DELAY)
360        self.record("INFO", None, None,
361                    "enabling %s warnings" % warning_type,
362                    {"warnings.enable": warning_type})
363
364
365    def monitor_disk_usage(self, max_rate):
366        """\
367        Signal that the job should monitor disk space usage on /
368        and generate a warning if a test uses up disk space at a
369        rate exceeding 'max_rate'.
370
371        Parameters:
372             max_rate - the maximium allowed rate of disk consumption
373                        during a test, in MB/hour, or 0 to indicate
374                        no limit.
375        """
376        self._max_disk_usage_rate = max_rate
377
378
379    def relative_path(self, path):
380        """\
381        Return a patch relative to the job results directory
382        """
383        head = len(self.resultdir) + 1     # remove the / inbetween
384        return path[head:]
385
386
387    def control_get(self):
388        return self.control
389
390
391    def control_set(self, control):
392        self.control = os.path.abspath(control)
393
394
395    def harness_select(self, which, harness_args):
396        self.harness = harness.select(which, self, harness_args)
397
398
399    def config_set(self, name, value):
400        self._config.set(name, value)
401
402
403    def config_get(self, name):
404        return self._config.get(name)
405
406
407    def setup_dirs(self, results_dir, tmp_dir):
408        if not tmp_dir:
409            tmp_dir = os.path.join(self.tmpdir, 'build')
410        if not os.path.exists(tmp_dir):
411            os.mkdir(tmp_dir)
412        if not os.path.isdir(tmp_dir):
413            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
414            raise ValueError(e_msg)
415
416        # We label the first build "build" and then subsequent ones
417        # as "build.2", "build.3", etc. Whilst this is a little bit
418        # inconsistent, 99.9% of jobs will only have one build
419        # (that's not done as kernbench, sparse, or buildtest),
420        # so it works out much cleaner. One of life's comprimises.
421        if not results_dir:
422            results_dir = os.path.join(self.resultdir, 'build')
423            i = 2
424            while os.path.exists(results_dir):
425                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
426                i += 1
427        if not os.path.exists(results_dir):
428            os.mkdir(results_dir)
429
430        return (results_dir, tmp_dir)
431
432
433    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
434                            kjob = None ):
435        """Summon a xen object"""
436        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
437        build_dir = 'xen'
438        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
439                       leave, kjob)
440
441
442    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
443        """Summon a kernel object"""
444        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
445        build_dir = 'linux'
446        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
447                                  build_dir, leave)
448
449
450    def barrier(self, *args, **kwds):
451        """Create a barrier object"""
452        return barrier.barrier(*args, **kwds)
453
454
455    def install_pkg(self, name, pkg_type, install_dir):
456        '''
457        This method is a simple wrapper around the actual package
458        installation method in the Packager class. This is used
459        internally by the profilers, deps and tests code.
460        name : name of the package (ex: sleeptest, dbench etc.)
461        pkg_type : Type of the package (ex: test, dep etc.)
462        install_dir : The directory in which the source is actually
463                      untarred into. (ex: client/profilers/<name> for profilers)
464        '''
465        if self.pkgmgr.repositories:
466            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
467
468
469    def add_repository(self, repo_urls):
470        '''
471        Adds the repository locations to the job so that packages
472        can be fetched from them when needed. The repository list
473        needs to be a string list
474        Ex: job.add_repository(['http://blah1','http://blah2'])
475        '''
476        for repo_url in repo_urls:
477            self.pkgmgr.add_repository(repo_url)
478
479        # Fetch the packages' checksum file that contains the checksums
480        # of all the packages if it is not already fetched. The checksum
481        # is always fetched whenever a job is first started. This
482        # is not done in the job's constructor as we don't have the list of
483        # the repositories there (and obviously don't care about this file
484        # if we are not using the repos)
485        try:
486            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
487                                              base_packages.CHECKSUM_FILE)
488            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
489                                  checksum_file_path, use_checksum=False)
490        except error.PackageFetchError:
491            # packaging system might not be working in this case
492            # Silently fall back to the normal case
493            pass
494
495
496    def require_gcc(self):
497        """
498        Test whether gcc is installed on the machine.
499        """
500        # check if gcc is installed on the system.
501        try:
502            utils.system('which gcc')
503        except error.CmdError, e:
504            raise NotAvailableError('gcc is required by this job and is '
505                                    'not available on the system')
506
507
508    def setup_dep(self, deps):
509        """Set up the dependencies for this test.
510        deps is a list of libraries required for this test.
511        """
512        # Fetch the deps from the repositories and set them up.
513        for dep in deps:
514            dep_dir = os.path.join(self.autodir, 'deps', dep)
515            # Search for the dependency in the repositories if specified,
516            # else check locally.
517            try:
518                self.install_pkg(dep, 'dep', dep_dir)
519            except error.PackageInstallError:
520                # see if the dep is there locally
521                pass
522
523            # dep_dir might not exist if it is not fetched from the repos
524            if not os.path.exists(dep_dir):
525                raise error.TestError("Dependency %s does not exist" % dep)
526
527            os.chdir(dep_dir)
528            if execfile('%s.py' % dep, {}) is None:
529                logging.info('Dependency %s successfuly built', dep)
530
531
532    def _runtest(self, url, tag, timeout, args, dargs):
533        try:
534            l = lambda : test.runtest(self, url, tag, args, dargs)
535            pid = parallel.fork_start(self.resultdir, l)
536
537            if timeout:
538                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
539                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
540            else:
541                parallel.fork_waitfor(self.resultdir, pid)
542
543        except error.TestBaseException:
544            # These are already classified with an error type (exit_status)
545            raise
546        except error.JobError:
547            raise  # Caught further up and turned into an ABORT.
548        except Exception, e:
549            # Converts all other exceptions thrown by the test regardless
550            # of phase into a TestError(TestBaseException) subclass that
551            # reports them with their full stack trace.
552            raise error.UnhandledTestError(e)
553
554
555    def _run_test_base(self, url, *args, **dargs):
556        """
557        Prepares arguments and run functions to run_test and run_test_detail.
558
559        @param url A url that identifies the test to run.
560        @param tag An optional keyword argument that will be added to the
561            test and subdir name.
562        @param subdir_tag An optional keyword argument that will be added
563            to the subdir name.
564
565        @returns:
566                subdir: Test subdirectory
567                testname: Test name
568                group_func: Actual test run function
569                timeout: Test timeout
570        """
571        group, testname = self.pkgmgr.get_package_name(url, 'test')
572        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
573        outputdir = self._make_test_outputdir(subdir)
574
575        timeout = dargs.pop('timeout', None)
576        if timeout:
577            logging.debug('Test has timeout: %d sec.', timeout)
578
579        def log_warning(reason):
580            self.record("WARN", subdir, testname, reason)
581        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
582        def group_func():
583            try:
584                self._runtest(url, tag, timeout, args, dargs)
585            except error.TestBaseException, detail:
586                # The error is already classified, record it properly.
587                self.record(detail.exit_status, subdir, testname, str(detail))
588                raise
589            else:
590                self.record('GOOD', subdir, testname, 'completed successfully')
591
592        return (subdir, testname, group_func, timeout)
593
594
595    @_run_test_complete_on_exit
596    def run_test(self, url, *args, **dargs):
597        """
598        Summon a test object and run it.
599
600        @param url A url that identifies the test to run.
601        @param tag An optional keyword argument that will be added to the
602            test and subdir name.
603        @param subdir_tag An optional keyword argument that will be added
604            to the subdir name.
605
606        @returns True if the test passes, False otherwise.
607        """
608        (subdir, testname, group_func, timeout) = self._run_test_base(url,
609                                                                      *args,
610                                                                      **dargs)
611        try:
612            self._rungroup(subdir, testname, group_func, timeout)
613            return True
614        except error.TestBaseException:
615            return False
616        # Any other exception here will be given to the caller
617        #
618        # NOTE: The only exception possible from the control file here
619        # is error.JobError as _runtest() turns all others into an
620        # UnhandledTestError that is caught above.
621
622
623    @_run_test_complete_on_exit
624    def run_test_detail(self, url, *args, **dargs):
625        """
626        Summon a test object and run it, returning test status.
627
628        @param url A url that identifies the test to run.
629        @param tag An optional keyword argument that will be added to the
630            test and subdir name.
631        @param subdir_tag An optional keyword argument that will be added
632            to the subdir name.
633
634        @returns Test status
635        @see: client/common_lib/error.py, exit_status
636        """
637        (subdir, testname, group_func, timeout) = self._run_test_base(url,
638                                                                      *args,
639                                                                      **dargs)
640        try:
641            self._rungroup(subdir, testname, group_func, timeout)
642            return 'GOOD'
643        except error.TestBaseException, detail:
644            return detail.exit_status
645
646
647    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
648        """\
649        subdir:
650                name of the group
651        testname:
652                name of the test to run, or support step
653        function:
654                subroutine to run
655        *args:
656                arguments for the function
657
658        Returns the result of the passed in function
659        """
660
661        try:
662            optional_fields = None
663            if timeout:
664                optional_fields = {}
665                optional_fields['timeout'] = timeout
666            self.record('START', subdir, testname,
667                        optional_fields=optional_fields)
668
669            self._state.set('client', 'unexpected_reboot', (subdir, testname))
670            try:
671                result = function(*args, **dargs)
672                self.record('END GOOD', subdir, testname)
673                return result
674            except error.TestBaseException, e:
675                self.record('END %s' % e.exit_status, subdir, testname)
676                raise
677            except error.JobError, e:
678                self.record('END ABORT', subdir, testname)
679                raise
680            except Exception, e:
681                # This should only ever happen due to a bug in the given
682                # function's code.  The common case of being called by
683                # run_test() will never reach this.  If a control file called
684                # run_group() itself, bugs in its function will be caught
685                # here.
686                err_msg = str(e) + '\n' + traceback.format_exc()
687                self.record('END ERROR', subdir, testname, err_msg)
688                raise
689        finally:
690            self._state.discard('client', 'unexpected_reboot')
691
692
693    def run_group(self, function, tag=None, **dargs):
694        """
695        Run a function nested within a group level.
696
697        function:
698                Callable to run.
699        tag:
700                An optional tag name for the group.  If None (default)
701                function.__name__ will be used.
702        **dargs:
703                Named arguments for the function.
704        """
705        if tag:
706            name = tag
707        else:
708            name = function.__name__
709
710        try:
711            return self._rungroup(subdir=None, testname=name,
712                                  function=function, timeout=None, **dargs)
713        except (SystemExit, error.TestBaseException):
714            raise
715        # If there was a different exception, turn it into a TestError.
716        # It will be caught by step_engine or _run_step_fn.
717        except Exception, e:
718            raise error.UnhandledTestError(e)
719
720
721    def cpu_count(self):
722        return utils.count_cpus()  # use total system count
723
724
725    def start_reboot(self):
726        self.record('START', None, 'reboot')
727        self.record('GOOD', None, 'reboot.start')
728
729
730    def _record_reboot_failure(self, subdir, operation, status,
731                               running_id=None):
732        self.record("ABORT", subdir, operation, status)
733        if not running_id:
734            running_id = utils.running_os_ident()
735        kernel = {"kernel": running_id.split("::")[0]}
736        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
737
738
739    def _check_post_reboot(self, subdir, running_id=None):
740        """
741        Function to perform post boot checks such as if the system configuration
742        has changed across reboots (specifically, CPUs and partitions).
743
744        @param subdir: The subdir to use in the job.record call.
745        @param running_id: An optional running_id to include in the reboot
746            failure log message
747
748        @raise JobError: Raised if the current configuration does not match the
749            pre-reboot configuration.
750        """
751        # check to see if any partitions have changed
752        partition_list = partition_lib.get_partition_list(self,
753                                                          exclude_swap=False)
754        mount_info = partition_lib.get_mount_info(partition_list)
755        old_mount_info = self._state.get('client', 'mount_info')
756        if mount_info != old_mount_info:
757            new_entries = mount_info - old_mount_info
758            old_entries = old_mount_info - mount_info
759            description = ("mounted partitions are different after reboot "
760                           "(old entries: %s, new entries: %s)" %
761                           (old_entries, new_entries))
762            self._record_reboot_failure(subdir, "reboot.verify_config",
763                                        description, running_id=running_id)
764            raise error.JobError("Reboot failed: %s" % description)
765
766        # check to see if any CPUs have changed
767        cpu_count = utils.count_cpus()
768        old_count = self._state.get('client', 'cpu_count')
769        if cpu_count != old_count:
770            description = ('Number of CPUs changed after reboot '
771                           '(old count: %d, new count: %d)' %
772                           (old_count, cpu_count))
773            self._record_reboot_failure(subdir, 'reboot.verify_config',
774                                        description, running_id=running_id)
775            raise error.JobError('Reboot failed: %s' % description)
776
777
778    def end_reboot(self, subdir, kernel, patches, running_id=None):
779        self._check_post_reboot(subdir, running_id=running_id)
780
781        # strip ::<timestamp> from the kernel version if present
782        kernel = kernel.split("::")[0]
783        kernel_info = {"kernel": kernel}
784        for i, patch in enumerate(patches):
785            kernel_info["patch%d" % i] = patch
786        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
787
788
789    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
790                              type='src', patches=[]):
791        """ Check the passed kernel identifier against the command line
792            and the running kernel, abort the job on missmatch. """
793
794        logging.info("POST BOOT: checking booted kernel "
795                     "mark=%d identity='%s' type='%s'",
796                     expected_when, expected_id, type)
797
798        running_id = utils.running_os_ident()
799
800        cmdline = utils.read_one_line("/proc/cmdline")
801
802        find_sum = re.compile(r'.*IDENT=(\d+)')
803        m = find_sum.match(cmdline)
804        cmdline_when = -1
805        if m:
806            cmdline_when = int(m.groups()[0])
807
808        # We have all the facts, see if they indicate we
809        # booted the requested kernel or not.
810        bad = False
811        if (type == 'src' and expected_id != running_id or
812            type == 'rpm' and
813            not running_id.startswith(expected_id + '::')):
814            logging.error("Kernel identifier mismatch")
815            bad = True
816        if expected_when != cmdline_when:
817            logging.error("Kernel command line mismatch")
818            bad = True
819
820        if bad:
821            logging.error("   Expected Ident: " + expected_id)
822            logging.error("    Running Ident: " + running_id)
823            logging.error("    Expected Mark: %d", expected_when)
824            logging.error("Command Line Mark: %d", cmdline_when)
825            logging.error("     Command Line: " + cmdline)
826
827            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
828                                        running_id=running_id)
829            raise error.JobError("Reboot returned with the wrong kernel")
830
831        self.record('GOOD', subdir, 'reboot.verify',
832                    utils.running_os_full_version())
833        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
834
835
836    def partition(self, device, loop_size=0, mountpoint=None):
837        """
838        Work with a machine partition
839
840            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
841            @param mountpoint: Specify a directory to mount to. If not specified
842                               autotest tmp directory will be used.
843            @param loop_size: Size of loopback device (in MB). Defaults to 0.
844
845            @return: A L{client.bin.partition.partition} object
846        """
847
848        if not mountpoint:
849            mountpoint = self.tmpdir
850        return partition_lib.partition(self, device, loop_size, mountpoint)
851
852    @utils.deprecated
853    def filesystem(self, device, mountpoint=None, loop_size=0):
854        """ Same as partition
855
856        @deprecated: Use partition method instead
857        """
858        return self.partition(device, loop_size, mountpoint)
859
860
861    def enable_external_logging(self):
862        pass
863
864
865    def disable_external_logging(self):
866        pass
867
868
869    def reboot_setup(self):
870        # save the partition list and mount points, as well as the cpu count
871        partition_list = partition_lib.get_partition_list(self,
872                                                          exclude_swap=False)
873        mount_info = partition_lib.get_mount_info(partition_list)
874        self._state.set('client', 'mount_info', mount_info)
875        self._state.set('client', 'cpu_count', utils.count_cpus())
876
877
878    def reboot(self, tag=LAST_BOOT_TAG):
879        if tag == LAST_BOOT_TAG:
880            tag = self.last_boot_tag
881        else:
882            self.last_boot_tag = tag
883
884        self.reboot_setup()
885        self.harness.run_reboot()
886        default = self.config_get('boot.set_default')
887        if default:
888            self.bootloader.set_default(tag)
889        else:
890            self.bootloader.boot_once(tag)
891
892        # HACK: using this as a module sometimes hangs shutdown, so if it's
893        # installed unload it first
894        utils.system("modprobe -r netconsole", ignore_status=True)
895
896        # sync first, so that a sync during shutdown doesn't time out
897        utils.system("sync; sync", ignore_status=True)
898
899        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
900        self.quit()
901
902
903    def noop(self, text):
904        logging.info("job: noop: " + text)
905
906
907    @_run_test_complete_on_exit
908    def parallel(self, *tasklist):
909        """Run tasks in parallel"""
910
911        pids = []
912        old_log_filename = self._logger.global_filename
913        for i, task in enumerate(tasklist):
914            assert isinstance(task, (tuple, list))
915            self._logger.global_filename = old_log_filename + (".%d" % i)
916            def task_func():
917                # stub out _record_indent with a process-local one
918                base_record_indent = self._record_indent
919                proc_local = self._job_state.property_factory(
920                    '_state', '_record_indent.%d' % os.getpid(),
921                    base_record_indent, namespace='client')
922                self.__class__._record_indent = proc_local
923                task[0](*task[1:])
924            pids.append(parallel.fork_start(self.resultdir, task_func))
925
926        old_log_path = os.path.join(self.resultdir, old_log_filename)
927        old_log = open(old_log_path, "a")
928        exceptions = []
929        for i, pid in enumerate(pids):
930            # wait for the task to finish
931            try:
932                parallel.fork_waitfor(self.resultdir, pid)
933            except Exception, e:
934                exceptions.append(e)
935            # copy the logs from the subtask into the main log
936            new_log_path = old_log_path + (".%d" % i)
937            if os.path.exists(new_log_path):
938                new_log = open(new_log_path)
939                old_log.write(new_log.read())
940                new_log.close()
941                old_log.flush()
942                os.remove(new_log_path)
943        old_log.close()
944
945        self._logger.global_filename = old_log_filename
946
947        # handle any exceptions raised by the parallel tasks
948        if exceptions:
949            msg = "%d task(s) failed in job.parallel" % len(exceptions)
950            raise error.JobError(msg)
951
952
953    def quit(self):
954        # XXX: should have a better name.
955        self.harness.run_pause()
956        raise error.JobContinue("more to come")
957
958
959    def complete(self, status):
960        """Write pending TAP reports, clean up, and exit"""
961        # write out TAP reports
962        if self._tap.do_tap_report:
963            self._tap.write()
964            self._tap._write_tap_archive()
965
966        # write out a job HTML report
967        try:
968            html_report.create_report(self.resultdir)
969        except Exception, e:
970            logging.error("Error writing job HTML report: %s", e)
971
972        # We are about to exit 'complete' so clean up the control file.
973        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
974        shutil.move(self._state_file, dest)
975
976        self.harness.run_complete()
977        self.disable_external_logging()
978        sys.exit(status)
979
980
981    def _load_state(self):
982        # grab any initial state and set up $CONTROL.state as the backing file
983        init_state_file = self.control + '.init.state'
984        self._state_file = self.control + '.state'
985        if os.path.exists(init_state_file):
986            shutil.move(init_state_file, self._state_file)
987        self._state.set_backing_file(self._state_file)
988
989        # initialize the state engine, if necessary
990        has_steps = self._state.has('client', 'steps')
991        if not self._is_continuation and has_steps:
992            raise RuntimeError('Loaded state can only contain client.steps if '
993                               'this is a continuation')
994
995        if not has_steps:
996            logging.debug('Initializing the state engine')
997            self._state.set('client', 'steps', [])
998
999
1000    def handle_persistent_option(self, options, option_name):
1001        """
1002        Select option from command line or persistent state.
1003        Store selected option to allow standalone client to continue
1004        after reboot with previously selected options.
1005        Priority:
1006        1. explicitly specified via command line
1007        2. stored in state file (if continuing job '-c')
1008        3. default == None
1009        """
1010        option = None
1011        cmd_line_option = getattr(options, option_name)
1012        if cmd_line_option:
1013            option = cmd_line_option
1014            self._state.set('client', option_name, option)
1015        else:
1016            stored_option = self._state.get('client', option_name, None)
1017            if stored_option:
1018                option = stored_option
1019        logging.debug('Persistent option %s now set to %s', option_name, option)
1020        return option
1021
1022
1023    def __create_step_tuple(self, fn, args, dargs):
1024        # Legacy code passes in an array where the first arg is
1025        # the function or its name.
1026        if isinstance(fn, list):
1027            assert(len(args) == 0)
1028            assert(len(dargs) == 0)
1029            args = fn[1:]
1030            fn = fn[0]
1031        # Pickling actual functions is hairy, thus we have to call
1032        # them by name.  Unfortunately, this means only functions
1033        # defined globally can be used as a next step.
1034        if callable(fn):
1035            fn = fn.__name__
1036        if not isinstance(fn, types.StringTypes):
1037            raise StepError("Next steps must be functions or "
1038                            "strings containing the function name")
1039        ancestry = copy.copy(self._current_step_ancestry)
1040        return (ancestry, fn, args, dargs)
1041
1042
1043    def next_step_append(self, fn, *args, **dargs):
1044        """Define the next step and place it at the end"""
1045        steps = self._state.get('client', 'steps')
1046        steps.append(self.__create_step_tuple(fn, args, dargs))
1047        self._state.set('client', 'steps', steps)
1048
1049
1050    def next_step(self, fn, *args, **dargs):
1051        """Create a new step and place it after any steps added
1052        while running the current step but before any steps added in
1053        previous steps"""
1054        steps = self._state.get('client', 'steps')
1055        steps.insert(self._next_step_index,
1056                     self.__create_step_tuple(fn, args, dargs))
1057        self._next_step_index += 1
1058        self._state.set('client', 'steps', steps)
1059
1060
1061    def next_step_prepend(self, fn, *args, **dargs):
1062        """Insert a new step, executing first"""
1063        steps = self._state.get('client', 'steps')
1064        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
1065        self._next_step_index += 1
1066        self._state.set('client', 'steps', steps)
1067
1068
1069
1070    def _run_step_fn(self, local_vars, fn, args, dargs):
1071        """Run a (step) function within the given context"""
1072
1073        local_vars['__args'] = args
1074        local_vars['__dargs'] = dargs
1075        try:
1076            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
1077            return local_vars['__ret']
1078        except SystemExit:
1079            raise  # Send error.JobContinue and JobComplete on up to runjob.
1080        except error.TestNAError, detail:
1081            self.record(detail.exit_status, None, fn, str(detail))
1082        except Exception, detail:
1083            raise error.UnhandledJobError(detail)
1084
1085
1086    def _create_frame(self, global_vars, ancestry, fn_name):
1087        """Set up the environment like it would have been when this
1088        function was first defined.
1089
1090        Child step engine 'implementations' must have 'return locals()'
1091        at end end of their steps.  Because of this, we can call the
1092        parent function and get back all child functions (i.e. those
1093        defined within it).
1094
1095        Unfortunately, the call stack of the function calling
1096        job.next_step might have been deeper than the function it
1097        added.  In order to make sure that the environment is what it
1098        should be, we need to then pop off the frames we built until
1099        we find the frame where the function was first defined."""
1100
1101        # The copies ensure that the parent frames are not modified
1102        # while building child frames.  This matters if we then
1103        # pop some frames in the next part of this function.
1104        current_frame = copy.copy(global_vars)
1105        frames = [current_frame]
1106        for steps_fn_name in ancestry:
1107            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1108            current_frame = copy.copy(ret)
1109            frames.append(current_frame)
1110
1111        # Walk up the stack frames until we find the place fn_name was defined.
1112        while len(frames) > 2:
1113            if fn_name not in frames[-2]:
1114                break
1115            if frames[-2][fn_name] != frames[-1][fn_name]:
1116                break
1117            frames.pop()
1118            ancestry.pop()
1119
1120        return (frames[-1], ancestry)
1121
1122
1123    def _add_step_init(self, local_vars, current_function):
1124        """If the function returned a dictionary that includes a
1125        function named 'step_init', prepend it to our list of steps.
1126        This will only get run the first time a function with a nested
1127        use of the step engine is run."""
1128
1129        if (isinstance(local_vars, dict) and
1130            'step_init' in local_vars and
1131            callable(local_vars['step_init'])):
1132            # The init step is a child of the function
1133            # we were just running.
1134            self._current_step_ancestry.append(current_function)
1135            self.next_step_prepend('step_init')
1136
1137
1138    def step_engine(self):
1139        """The multi-run engine used when the control file defines step_init.
1140
1141        Does the next step.
1142        """
1143
1144        # Set up the environment and then interpret the control file.
1145        # Some control files will have code outside of functions,
1146        # which means we need to have our state engine initialized
1147        # before reading in the file.
1148        global_control_vars = {'job': self,
1149                               'args': self.args}
1150        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1151        try:
1152            execfile(self.control, global_control_vars, global_control_vars)
1153        except error.TestNAError, detail:
1154            self.record(detail.exit_status, None, self.control, str(detail))
1155        except SystemExit:
1156            raise  # Send error.JobContinue and JobComplete on up to runjob.
1157        except Exception, detail:
1158            # Syntax errors or other general Python exceptions coming out of
1159            # the top level of the control file itself go through here.
1160            raise error.UnhandledJobError(detail)
1161
1162        # If we loaded in a mid-job state file, then we presumably
1163        # know what steps we have yet to run.
1164        if not self._is_continuation:
1165            if 'step_init' in global_control_vars:
1166                self.next_step(global_control_vars['step_init'])
1167        else:
1168            # if last job failed due to unexpected reboot, record it as fail
1169            # so harness gets called
1170            last_job = self._state.get('client', 'unexpected_reboot', None)
1171            if last_job:
1172                subdir, testname = last_job
1173                self.record('FAIL', subdir, testname, 'unexpected reboot')
1174                self.record('END FAIL', subdir, testname)
1175
1176        # Iterate through the steps.  If we reboot, we'll simply
1177        # continue iterating on the next step.
1178        while len(self._state.get('client', 'steps')) > 0:
1179            steps = self._state.get('client', 'steps')
1180            (ancestry, fn_name, args, dargs) = steps.pop(0)
1181            self._state.set('client', 'steps', steps)
1182
1183            self._next_step_index = 0
1184            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1185            local_vars, self._current_step_ancestry = ret
1186            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1187            self._add_step_init(local_vars, fn_name)
1188
1189
1190    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1191        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1192                                   on_every_test)
1193
1194
1195    def add_sysinfo_logfile(self, file, on_every_test=False):
1196        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1197
1198
1199    def _add_sysinfo_loggable(self, loggable, on_every_test):
1200        if on_every_test:
1201            self.sysinfo.test_loggables.add(loggable)
1202        else:
1203            self.sysinfo.boot_loggables.add(loggable)
1204        self._save_sysinfo_state()
1205
1206
1207    def _load_sysinfo_state(self):
1208        state = self._state.get('client', 'sysinfo', None)
1209        if state:
1210            self.sysinfo.deserialize(state)
1211
1212
1213    def _save_sysinfo_state(self):
1214        state = self.sysinfo.serialize()
1215        self._state.set('client', 'sysinfo', state)
1216
1217
1218class disk_usage_monitor:
1219    def __init__(self, logging_func, device, max_mb_per_hour):
1220        self.func = logging_func
1221        self.device = device
1222        self.max_mb_per_hour = max_mb_per_hour
1223
1224
1225    def start(self):
1226        self.initial_space = utils.freespace(self.device)
1227        self.start_time = time.time()
1228
1229
1230    def stop(self):
1231        # if no maximum usage rate was set, we don't need to
1232        # generate any warnings
1233        if not self.max_mb_per_hour:
1234            return
1235
1236        final_space = utils.freespace(self.device)
1237        used_space = self.initial_space - final_space
1238        stop_time = time.time()
1239        total_time = stop_time - self.start_time
1240        # round up the time to one minute, to keep extremely short
1241        # tests from generating false positives due to short, badly
1242        # timed bursts of activity
1243        total_time = max(total_time, 60.0)
1244
1245        # determine the usage rate
1246        bytes_per_sec = used_space / total_time
1247        mb_per_sec = bytes_per_sec / 1024**2
1248        mb_per_hour = mb_per_sec * 60 * 60
1249
1250        if mb_per_hour > self.max_mb_per_hour:
1251            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1252            msg %= (self.device, mb_per_hour)
1253            self.func(msg)
1254
1255
1256    @classmethod
1257    def watch(cls, *monitor_args, **monitor_dargs):
1258        """ Generic decorator to wrap a function call with the
1259        standard create-monitor -> start -> call -> stop idiom."""
1260        def decorator(func):
1261            def watched_func(*args, **dargs):
1262                monitor = cls(*monitor_args, **monitor_dargs)
1263                monitor.start()
1264                try:
1265                    func(*args, **dargs)
1266                finally:
1267                    monitor.stop()
1268            return watched_func
1269        return decorator
1270
1271
1272def runjob(control, drop_caches, options):
1273    """
1274    Run a job using the given control file.
1275
1276    This is the main interface to this module.
1277
1278    @see base_job.__init__ for parameter info.
1279    """
1280    control = os.path.abspath(control)
1281    state = control + '.state'
1282    # Ensure state file is cleaned up before the job starts to run if autotest
1283    # is not running with the --continue flag
1284    if not options.cont and os.path.isfile(state):
1285        logging.debug('Cleaning up previously found state file')
1286        os.remove(state)
1287
1288    # instantiate the job object ready for the control file.
1289    myjob = None
1290    try:
1291        # Check that the control file is valid
1292        if not os.path.exists(control):
1293            raise error.JobError(control + ": control file not found")
1294
1295        # When continuing, the job is complete when there is no
1296        # state file, ensure we don't try and continue.
1297        if options.cont and not os.path.exists(state):
1298            raise error.JobComplete("all done")
1299
1300        myjob = job(control=control, drop_caches=drop_caches, options=options)
1301
1302        # Load in the users control file, may do any one of:
1303        #  1) execute in toto
1304        #  2) define steps, and select the first via next_step()
1305        myjob.step_engine()
1306
1307    except error.JobContinue:
1308        sys.exit(5)
1309
1310    except error.JobComplete:
1311        sys.exit(1)
1312
1313    except error.JobError, instance:
1314        logging.error("JOB ERROR: " + str(instance))
1315        if myjob:
1316            command = None
1317            if len(instance.args) > 1:
1318                command = instance.args[1]
1319                myjob.record('ABORT', None, command, str(instance))
1320            myjob.record('END ABORT', None, None, str(instance))
1321            assert myjob._record_indent == 0
1322            myjob.complete(1)
1323        else:
1324            sys.exit(1)
1325
1326    except Exception, e:
1327        # NOTE: job._run_step_fn and job.step_engine will turn things into
1328        # a JobError for us.  If we get here, its likely an autotest bug.
1329        msg = str(e) + '\n' + traceback.format_exc()
1330        logging.critical("JOB ERROR (autotest bug?): " + msg)
1331        if myjob:
1332            myjob.record('END ABORT', None, None, msg)
1333            assert myjob._record_indent == 0
1334            myjob.complete(1)
1335        else:
1336            sys.exit(1)
1337
1338    # If we get here, then we assume the job is complete and good.
1339    myjob.record('END GOOD', None, None)
1340    assert myjob._record_indent == 0
1341
1342    myjob.complete(0)
1343
1344
1345site_job = utils.import_site_class(
1346    __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
1347
1348class job(site_job):
1349    pass
1350