job.py revision ce1b0620fddfb6efa33cc9441d9c91ce06b30de3
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9import logging, getpass, errno, weakref
10import cPickle as pickle
11from autotest_lib.client.bin import client_logging_config
12from autotest_lib.client.bin import utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, test, local_host
15from autotest_lib.client.bin import partition as partition_lib
16from autotest_lib.client.common_lib import base_job
17from autotest_lib.client.common_lib import error, barrier, log, logging_manager
18from autotest_lib.client.common_lib import base_packages, packages
19from autotest_lib.client.common_lib import global_config
20from autotest_lib.client.tools import html_report
21
22
23LAST_BOOT_TAG = object()
24JOB_PREAMBLE = """
25from autotest_lib.client.common_lib.error import *
26from autotest_lib.client.bin.utils import *
27"""
28
29
30class StepError(error.AutotestError):
31    pass
32
33class NotAvailableError(error.AutotestError):
34    pass
35
36
37
38def _run_test_complete_on_exit(f):
39    """Decorator for job methods that automatically calls
40    self.harness.run_test_complete when the method exits, if appropriate."""
41    def wrapped(self, *args, **dargs):
42        try:
43            return f(self, *args, **dargs)
44        finally:
45            if self._logger.global_filename == 'status':
46                self.harness.run_test_complete()
47                if self.drop_caches:
48                    logging.debug("Dropping caches")
49                    utils.drop_caches()
50    wrapped.__name__ = f.__name__
51    wrapped.__doc__ = f.__doc__
52    wrapped.__dict__.update(f.__dict__)
53    return wrapped
54
55
56class status_indenter(base_job.status_indenter):
57    """Provide a status indenter that is backed by job._record_prefix."""
58    def __init__(self, job):
59        self.job = weakref.proxy(job)  # avoid a circular reference
60
61
62    @property
63    def indent(self):
64        return self.job._record_indent
65
66
67    def increment(self):
68        self.job._record_indent += 1
69
70
71    def decrement(self):
72        self.job._record_indent -= 1
73
74
75class base_client_job(base_job.base_job):
76    """The client-side concrete implementation of base_job.
77
78    Optional properties provided by this implementation:
79        control
80        bootloader
81        harness
82    """
83
84    _WARNING_DISABLE_DELAY = 5
85
86    # _record_indent is a persistent property, but only on the client
87    _job_state = base_job.base_job._job_state
88    _record_indent = _job_state.property_factory(
89        '_state', '_record_indent', 0, namespace='client')
90    _max_disk_usage_rate = _job_state.property_factory(
91        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
92
93
94    def __init__(self, control, options, drop_caches=True,
95                 extra_copy_cmdline=None):
96        """
97        Prepare a client side job object.
98
99        @param control: The control file (pathname of).
100        @param options: an object which includes:
101                jobtag: The job tag string (eg "default").
102                cont: If this is the continuation of this job.
103                harness_type: An alternative server harness.  [None]
104                use_external_logging: If true, the enable_external_logging
105                          method will be called during construction.  [False]
106        @param drop_caches: If true, utils.drop_caches() is called before and
107                between all tests.  [True]
108        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
109                copy from the running kernel to all the installed kernels with
110                this job
111        """
112        super(base_client_job, self).__init__(options=options)
113        self._pre_record_init(control, options)
114        try:
115            self._post_record_init(control, options, drop_caches,
116                                   extra_copy_cmdline)
117        except Exception, err:
118            self.record(
119                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
120                    str(err))
121            raise
122
123
124    @classmethod
125    def _get_environ_autodir(cls):
126        return os.environ['AUTODIR']
127
128
129    @classmethod
130    def _find_base_directories(cls):
131        """
132        Determine locations of autodir and clientdir (which are the same)
133        using os.environ. Serverdir does not exist in this context.
134        """
135        autodir = clientdir = cls._get_environ_autodir()
136        return autodir, clientdir, None
137
138
139    @classmethod
140    def _parse_args(cls, args):
141        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
142
143
144    def _find_resultdir(self, options):
145        """
146        Determine the directory for storing results. On a client this is
147        always <autodir>/results/<tag>, where tag is passed in on the command
148        line as an option.
149        """
150        return os.path.join(self.autodir, 'results', options.tag)
151
152
153    def _get_status_logger(self):
154        """Return a reference to the status logger."""
155        return self._logger
156
157
158    def _pre_record_init(self, control, options):
159        """
160        Initialization function that should peform ONLY the required
161        setup so that the self.record() method works.
162
163        As of now self.record() needs self.resultdir, self._group_level,
164        self.harness and of course self._logger.
165        """
166        if not options.cont:
167            self._cleanup_debugdir_files()
168            self._cleanup_results_dir()
169
170        logging_manager.configure_logging(
171            client_logging_config.ClientLoggingConfig(),
172            results_dir=self.resultdir,
173            verbose=options.verbose)
174        logging.info('Writing results to %s', self.resultdir)
175
176        # init_group_level needs the state
177        self.control = os.path.realpath(control)
178        self._is_continuation = options.cont
179        self._current_step_ancestry = []
180        self._next_step_index = 0
181        self._load_state()
182
183        _harness = self.handle_persistent_option(options, 'harness')
184        _harness_args = self.handle_persistent_option(options, 'harness_args')
185
186        self.harness = harness.select(_harness, self, _harness_args)
187
188        # set up the status logger
189        def client_job_record_hook(entry):
190            msg_tag = ''
191            if '.' in self._logger.global_filename:
192                msg_tag = self._logger.global_filename.split('.', 1)[1]
193            # send the entry to the job harness
194            message = '\n'.join([entry.message] + entry.extra_message_lines)
195            rendered_entry = self._logger.render_entry(entry)
196            self.harness.test_status_detail(entry.status_code, entry.subdir,
197                                            entry.operation, message, msg_tag,
198                                            entry.fields)
199            self.harness.test_status(rendered_entry, msg_tag)
200            # send the entry to stdout, if it's enabled
201            logging.info(rendered_entry)
202        self._logger = base_job.status_logger(
203            self, status_indenter(self), record_hook=client_job_record_hook,
204            tap_writer=self._tap)
205
206    def _post_record_init(self, control, options, drop_caches,
207                          extra_copy_cmdline):
208        """
209        Perform job initialization not required by self.record().
210        """
211        self._init_drop_caches(drop_caches)
212
213        self._init_packages()
214
215        self.sysinfo = sysinfo.sysinfo(self.resultdir)
216        self._load_sysinfo_state()
217
218        if not options.cont:
219            download = os.path.join(self.testdir, 'download')
220            if not os.path.exists(download):
221                os.mkdir(download)
222
223            shutil.copyfile(self.control,
224                            os.path.join(self.resultdir, 'control'))
225
226        self.control = control
227
228        self.logging = logging_manager.get_logging_manager(
229                manage_stdout_and_stderr=True, redirect_fds=True)
230        self.logging.start_logging()
231
232        self._config = config.config(self)
233        self.profilers = profilers.profilers(self)
234
235        self._init_bootloader()
236
237        self.machines = [options.hostname]
238        self.hosts = set([local_host.LocalHost(hostname=options.hostname,
239                                               bootloader=self.bootloader)])
240
241        self.args = []
242        if options.args:
243            self.args = self._parse_args(options.args)
244
245        if options.user:
246            self.user = options.user
247        else:
248            self.user = getpass.getuser()
249
250        self.sysinfo.log_per_reboot_data()
251
252        if not options.cont:
253            self.record('START', None, None)
254
255        self.harness.run_start()
256
257        if options.log:
258            self.enable_external_logging()
259
260        self._init_cmdline(extra_copy_cmdline)
261
262        self.num_tests_run = None
263        self.num_tests_failed = None
264
265        self.warning_loggers = None
266        self.warning_manager = None
267
268
269    def _init_drop_caches(self, drop_caches):
270        """
271        Perform the drop caches initialization.
272        """
273        self.drop_caches_between_iterations = (
274                       global_config.global_config.get_config_value('CLIENT',
275                                            'drop_caches_between_iterations',
276                                            type=bool, default=True))
277        self.drop_caches = drop_caches
278        if self.drop_caches:
279            logging.debug("Dropping caches")
280            utils.drop_caches()
281
282
283    def _init_bootloader(self):
284        """
285        Perform boottool initialization.
286        """
287        tool = self.config_get('boottool.executable')
288        self.bootloader = boottool.boottool(tool)
289
290
291    def _init_packages(self):
292        """
293        Perform the packages support initialization.
294        """
295        self.pkgmgr = packages.PackageManager(
296            self.autodir, run_function_dargs={'timeout':3600})
297
298
299    def _init_cmdline(self, extra_copy_cmdline):
300        """
301        Initialize default cmdline for booted kernels in this job.
302        """
303        copy_cmdline = set(['console'])
304        if extra_copy_cmdline is not None:
305            copy_cmdline.update(extra_copy_cmdline)
306
307        # extract console= and other args from cmdline and add them into the
308        # base args that we use for all kernels we install
309        cmdline = utils.read_one_line('/proc/cmdline')
310        kernel_args = []
311        for karg in cmdline.split():
312            for param in copy_cmdline:
313                if karg.startswith(param) and \
314                    (len(param) == len(karg) or karg[len(param)] == '='):
315                    kernel_args.append(karg)
316        self.config_set('boot.default_args', ' '.join(kernel_args))
317
318
319    def _cleanup_results_dir(self):
320        """Delete everything in resultsdir"""
321        assert os.path.exists(self.resultdir)
322        list_files = glob.glob('%s/*' % self.resultdir)
323        for f in list_files:
324            if os.path.isdir(f):
325                shutil.rmtree(f)
326            elif os.path.isfile(f):
327                os.remove(f)
328
329
330    def _cleanup_debugdir_files(self):
331        """
332        Delete any leftover debugdir files
333        """
334        list_files = glob.glob("/tmp/autotest_results_dir.*")
335        for f in list_files:
336            os.remove(f)
337
338
339    def disable_warnings(self, warning_type):
340        self.record("INFO", None, None,
341                    "disabling %s warnings" % warning_type,
342                    {"warnings.disable": warning_type})
343        time.sleep(self._WARNING_DISABLE_DELAY)
344
345
346    def enable_warnings(self, warning_type):
347        time.sleep(self._WARNING_DISABLE_DELAY)
348        self.record("INFO", None, None,
349                    "enabling %s warnings" % warning_type,
350                    {"warnings.enable": warning_type})
351
352
353    def monitor_disk_usage(self, max_rate):
354        """\
355        Signal that the job should monitor disk space usage on /
356        and generate a warning if a test uses up disk space at a
357        rate exceeding 'max_rate'.
358
359        Parameters:
360             max_rate - the maximium allowed rate of disk consumption
361                        during a test, in MB/hour, or 0 to indicate
362                        no limit.
363        """
364        self._max_disk_usage_rate = max_rate
365
366
367    def relative_path(self, path):
368        """\
369        Return a patch relative to the job results directory
370        """
371        head = len(self.resultdir) + 1     # remove the / inbetween
372        return path[head:]
373
374
375    def control_get(self):
376        return self.control
377
378
379    def control_set(self, control):
380        self.control = os.path.abspath(control)
381
382
383    def harness_select(self, which, harness_args):
384        self.harness = harness.select(which, self, harness_args)
385
386
387    def config_set(self, name, value):
388        self._config.set(name, value)
389
390
391    def config_get(self, name):
392        return self._config.get(name)
393
394
395    def setup_dirs(self, results_dir, tmp_dir):
396        if not tmp_dir:
397            tmp_dir = os.path.join(self.tmpdir, 'build')
398        if not os.path.exists(tmp_dir):
399            os.mkdir(tmp_dir)
400        if not os.path.isdir(tmp_dir):
401            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
402            raise ValueError(e_msg)
403
404        # We label the first build "build" and then subsequent ones
405        # as "build.2", "build.3", etc. Whilst this is a little bit
406        # inconsistent, 99.9% of jobs will only have one build
407        # (that's not done as kernbench, sparse, or buildtest),
408        # so it works out much cleaner. One of life's comprimises.
409        if not results_dir:
410            results_dir = os.path.join(self.resultdir, 'build')
411            i = 2
412            while os.path.exists(results_dir):
413                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
414                i += 1
415        if not os.path.exists(results_dir):
416            os.mkdir(results_dir)
417
418        return (results_dir, tmp_dir)
419
420
421    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
422                            kjob = None ):
423        """Summon a xen object"""
424        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
425        build_dir = 'xen'
426        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
427                       leave, kjob)
428
429
430    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
431        """Summon a kernel object"""
432        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
433        build_dir = 'linux'
434        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
435                                  build_dir, leave)
436
437
438    def barrier(self, *args, **kwds):
439        """Create a barrier object"""
440        return barrier.barrier(*args, **kwds)
441
442
443    def install_pkg(self, name, pkg_type, install_dir):
444        '''
445        This method is a simple wrapper around the actual package
446        installation method in the Packager class. This is used
447        internally by the profilers, deps and tests code.
448        name : name of the package (ex: sleeptest, dbench etc.)
449        pkg_type : Type of the package (ex: test, dep etc.)
450        install_dir : The directory in which the source is actually
451                      untarred into. (ex: client/profilers/<name> for profilers)
452        '''
453        if self.pkgmgr.repositories:
454            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
455
456
457    def add_repository(self, repo_urls):
458        '''
459        Adds the repository locations to the job so that packages
460        can be fetched from them when needed. The repository list
461        needs to be a string list
462        Ex: job.add_repository(['http://blah1','http://blah2'])
463        '''
464        for repo_url in repo_urls:
465            self.pkgmgr.add_repository(repo_url)
466
467        # Fetch the packages' checksum file that contains the checksums
468        # of all the packages if it is not already fetched. The checksum
469        # is always fetched whenever a job is first started. This
470        # is not done in the job's constructor as we don't have the list of
471        # the repositories there (and obviously don't care about this file
472        # if we are not using the repos)
473        try:
474            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
475                                              base_packages.CHECKSUM_FILE)
476            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
477                                  checksum_file_path, use_checksum=False)
478        except error.PackageFetchError:
479            # packaging system might not be working in this case
480            # Silently fall back to the normal case
481            pass
482
483
484    def require_gcc(self):
485        """
486        Test whether gcc is installed on the machine.
487        """
488        # check if gcc is installed on the system.
489        try:
490            utils.system('which gcc')
491        except error.CmdError, e:
492            raise NotAvailableError('gcc is required by this job and is '
493                                    'not available on the system')
494
495
496    def setup_dep(self, deps):
497        """Set up the dependencies for this test.
498        deps is a list of libraries required for this test.
499        """
500        # Fetch the deps from the repositories and set them up.
501        for dep in deps:
502            dep_dir = os.path.join(self.autodir, 'deps', dep)
503            # Search for the dependency in the repositories if specified,
504            # else check locally.
505            try:
506                self.install_pkg(dep, 'dep', dep_dir)
507            except error.PackageInstallError:
508                # see if the dep is there locally
509                pass
510
511            # dep_dir might not exist if it is not fetched from the repos
512            if not os.path.exists(dep_dir):
513                raise error.TestError("Dependency %s does not exist" % dep)
514
515            os.chdir(dep_dir)
516            if execfile('%s.py' % dep, {}) is None:
517                logging.info('Dependency %s successfuly built', dep)
518
519
520    def _runtest(self, url, tag, timeout, args, dargs):
521        try:
522            l = lambda : test.runtest(self, url, tag, args, dargs)
523            pid = parallel.fork_start(self.resultdir, l)
524
525            if timeout:
526                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)
527                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)
528            else:
529                parallel.fork_waitfor(self.resultdir, pid)
530
531        except error.TestBaseException:
532            # These are already classified with an error type (exit_status)
533            raise
534        except error.JobError:
535            raise  # Caught further up and turned into an ABORT.
536        except Exception, e:
537            # Converts all other exceptions thrown by the test regardless
538            # of phase into a TestError(TestBaseException) subclass that
539            # reports them with their full stack trace.
540            raise error.UnhandledTestError(e)
541
542
543    def _run_test_base(self, url, *args, **dargs):
544        """
545        Prepares arguments and run functions to run_test and run_test_detail.
546
547        @param url A url that identifies the test to run.
548        @param tag An optional keyword argument that will be added to the
549            test and subdir name.
550        @param subdir_tag An optional keyword argument that will be added
551            to the subdir name.
552
553        @returns:
554                subdir: Test subdirectory
555                testname: Test name
556                group_func: Actual test run function
557                timeout: Test timeout
558        """
559        group, testname = self.pkgmgr.get_package_name(url, 'test')
560        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
561        outputdir = self._make_test_outputdir(subdir)
562
563        timeout = dargs.pop('timeout', None)
564        if timeout:
565            logging.debug('Test has timeout: %d sec.', timeout)
566
567        def log_warning(reason):
568            self.record("WARN", subdir, testname, reason)
569        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
570        def group_func():
571            try:
572                self._runtest(url, tag, timeout, args, dargs)
573            except error.TestBaseException, detail:
574                # The error is already classified, record it properly.
575                self.record(detail.exit_status, subdir, testname, str(detail))
576                raise
577            else:
578                self.record('GOOD', subdir, testname, 'completed successfully')
579
580        return (subdir, testname, group_func, timeout)
581
582
583    @_run_test_complete_on_exit
584    def run_test(self, url, *args, **dargs):
585        """
586        Summon a test object and run it.
587
588        @param url A url that identifies the test to run.
589        @param tag An optional keyword argument that will be added to the
590            test and subdir name.
591        @param subdir_tag An optional keyword argument that will be added
592            to the subdir name.
593
594        @returns True if the test passes, False otherwise.
595        """
596        (subdir, testname, group_func, timeout) = self._run_test_base(url,
597                                                                      *args,
598                                                                      **dargs)
599        try:
600            self._rungroup(subdir, testname, group_func, timeout)
601            return True
602        except error.TestBaseException:
603            return False
604        # Any other exception here will be given to the caller
605        #
606        # NOTE: The only exception possible from the control file here
607        # is error.JobError as _runtest() turns all others into an
608        # UnhandledTestError that is caught above.
609
610
611    @_run_test_complete_on_exit
612    def run_test_detail(self, url, *args, **dargs):
613        """
614        Summon a test object and run it, returning test status.
615
616        @param url A url that identifies the test to run.
617        @param tag An optional keyword argument that will be added to the
618            test and subdir name.
619        @param subdir_tag An optional keyword argument that will be added
620            to the subdir name.
621
622        @returns Test status
623        @see: client/common_lib/error.py, exit_status
624        """
625        (subdir, testname, group_func, timeout) = self._run_test_base(url,
626                                                                      *args,
627                                                                      **dargs)
628        try:
629            self._rungroup(subdir, testname, group_func, timeout)
630            return 'GOOD'
631        except error.TestBaseException, detail:
632            return detail.exit_status
633
634
635    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):
636        """\
637        subdir:
638                name of the group
639        testname:
640                name of the test to run, or support step
641        function:
642                subroutine to run
643        *args:
644                arguments for the function
645
646        Returns the result of the passed in function
647        """
648
649        try:
650            optional_fields = None
651            if timeout:
652                optional_fields = {}
653                optional_fields['timeout'] = timeout
654            self.record('START', subdir, testname,
655                        optional_fields=optional_fields)
656
657            self._state.set('client', 'unexpected_reboot', (subdir, testname))
658            try:
659                result = function(*args, **dargs)
660                self.record('END GOOD', subdir, testname)
661                return result
662            except error.TestBaseException, e:
663                self.record('END %s' % e.exit_status, subdir, testname)
664                raise
665            except error.JobError, e:
666                self.record('END ABORT', subdir, testname)
667                raise
668            except Exception, e:
669                # This should only ever happen due to a bug in the given
670                # function's code.  The common case of being called by
671                # run_test() will never reach this.  If a control file called
672                # run_group() itself, bugs in its function will be caught
673                # here.
674                err_msg = str(e) + '\n' + traceback.format_exc()
675                self.record('END ERROR', subdir, testname, err_msg)
676                raise
677        finally:
678            self._state.discard('client', 'unexpected_reboot')
679
680
681    def run_group(self, function, tag=None, **dargs):
682        """
683        Run a function nested within a group level.
684
685        function:
686                Callable to run.
687        tag:
688                An optional tag name for the group.  If None (default)
689                function.__name__ will be used.
690        **dargs:
691                Named arguments for the function.
692        """
693        if tag:
694            name = tag
695        else:
696            name = function.__name__
697
698        try:
699            return self._rungroup(subdir=None, testname=name,
700                                  function=function, timeout=None, **dargs)
701        except (SystemExit, error.TestBaseException):
702            raise
703        # If there was a different exception, turn it into a TestError.
704        # It will be caught by step_engine or _run_step_fn.
705        except Exception, e:
706            raise error.UnhandledTestError(e)
707
708
709    def cpu_count(self):
710        return utils.count_cpus()  # use total system count
711
712
713    def start_reboot(self):
714        self.record('START', None, 'reboot')
715        self.record('GOOD', None, 'reboot.start')
716
717
718    def _record_reboot_failure(self, subdir, operation, status,
719                               running_id=None):
720        self.record("ABORT", subdir, operation, status)
721        if not running_id:
722            running_id = utils.running_os_ident()
723        kernel = {"kernel": running_id.split("::")[0]}
724        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
725
726
727    def _check_post_reboot(self, subdir, running_id=None):
728        """
729        Function to perform post boot checks such as if the system configuration
730        has changed across reboots (specifically, CPUs and partitions).
731
732        @param subdir: The subdir to use in the job.record call.
733        @param running_id: An optional running_id to include in the reboot
734            failure log message
735
736        @raise JobError: Raised if the current configuration does not match the
737            pre-reboot configuration.
738        """
739        # check to see if any partitions have changed
740        partition_list = partition_lib.get_partition_list(self,
741                                                          exclude_swap=False)
742        mount_info = partition_lib.get_mount_info(partition_list)
743        old_mount_info = self._state.get('client', 'mount_info')
744        if mount_info != old_mount_info:
745            new_entries = mount_info - old_mount_info
746            old_entries = old_mount_info - mount_info
747            description = ("mounted partitions are different after reboot "
748                           "(old entries: %s, new entries: %s)" %
749                           (old_entries, new_entries))
750            self._record_reboot_failure(subdir, "reboot.verify_config",
751                                        description, running_id=running_id)
752            raise error.JobError("Reboot failed: %s" % description)
753
754        # check to see if any CPUs have changed
755        cpu_count = utils.count_cpus()
756        old_count = self._state.get('client', 'cpu_count')
757        if cpu_count != old_count:
758            description = ('Number of CPUs changed after reboot '
759                           '(old count: %d, new count: %d)' %
760                           (old_count, cpu_count))
761            self._record_reboot_failure(subdir, 'reboot.verify_config',
762                                        description, running_id=running_id)
763            raise error.JobError('Reboot failed: %s' % description)
764
765
766    def end_reboot(self, subdir, kernel, patches, running_id=None):
767        self._check_post_reboot(subdir, running_id=running_id)
768
769        # strip ::<timestamp> from the kernel version if present
770        kernel = kernel.split("::")[0]
771        kernel_info = {"kernel": kernel}
772        for i, patch in enumerate(patches):
773            kernel_info["patch%d" % i] = patch
774        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
775
776
777    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
778                              type='src', patches=[]):
779        """ Check the passed kernel identifier against the command line
780            and the running kernel, abort the job on missmatch. """
781
782        logging.info("POST BOOT: checking booted kernel "
783                     "mark=%d identity='%s' type='%s'",
784                     expected_when, expected_id, type)
785
786        running_id = utils.running_os_ident()
787
788        cmdline = utils.read_one_line("/proc/cmdline")
789
790        find_sum = re.compile(r'.*IDENT=(\d+)')
791        m = find_sum.match(cmdline)
792        cmdline_when = -1
793        if m:
794            cmdline_when = int(m.groups()[0])
795
796        # We have all the facts, see if they indicate we
797        # booted the requested kernel or not.
798        bad = False
799        if (type == 'src' and expected_id != running_id or
800            type == 'rpm' and
801            not running_id.startswith(expected_id + '::')):
802            logging.error("Kernel identifier mismatch")
803            bad = True
804        if expected_when != cmdline_when:
805            logging.error("Kernel command line mismatch")
806            bad = True
807
808        if bad:
809            logging.error("   Expected Ident: " + expected_id)
810            logging.error("    Running Ident: " + running_id)
811            logging.error("    Expected Mark: %d", expected_when)
812            logging.error("Command Line Mark: %d", cmdline_when)
813            logging.error("     Command Line: " + cmdline)
814
815            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
816                                        running_id=running_id)
817            raise error.JobError("Reboot returned with the wrong kernel")
818
819        self.record('GOOD', subdir, 'reboot.verify',
820                    utils.running_os_full_version())
821        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
822
823
824    def partition(self, device, loop_size=0, mountpoint=None):
825        """
826        Work with a machine partition
827
828            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
829            @param mountpoint: Specify a directory to mount to. If not specified
830                               autotest tmp directory will be used.
831            @param loop_size: Size of loopback device (in MB). Defaults to 0.
832
833            @return: A L{client.bin.partition.partition} object
834        """
835
836        if not mountpoint:
837            mountpoint = self.tmpdir
838        return partition_lib.partition(self, device, loop_size, mountpoint)
839
840    @utils.deprecated
841    def filesystem(self, device, mountpoint=None, loop_size=0):
842        """ Same as partition
843
844        @deprecated: Use partition method instead
845        """
846        return self.partition(device, loop_size, mountpoint)
847
848
849    def enable_external_logging(self):
850        pass
851
852
853    def disable_external_logging(self):
854        pass
855
856
857    def reboot_setup(self):
858        # save the partition list and mount points, as well as the cpu count
859        partition_list = partition_lib.get_partition_list(self,
860                                                          exclude_swap=False)
861        mount_info = partition_lib.get_mount_info(partition_list)
862        self._state.set('client', 'mount_info', mount_info)
863        self._state.set('client', 'cpu_count', utils.count_cpus())
864
865
866    def reboot(self, tag=LAST_BOOT_TAG):
867        if tag == LAST_BOOT_TAG:
868            tag = self.last_boot_tag
869        else:
870            self.last_boot_tag = tag
871
872        self.reboot_setup()
873        self.harness.run_reboot()
874        default = self.config_get('boot.set_default')
875        if default:
876            self.bootloader.set_default(tag)
877        else:
878            self.bootloader.boot_once(tag)
879
880        # HACK: using this as a module sometimes hangs shutdown, so if it's
881        # installed unload it first
882        utils.system("modprobe -r netconsole", ignore_status=True)
883
884        # sync first, so that a sync during shutdown doesn't time out
885        utils.system("sync; sync", ignore_status=True)
886
887        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
888        self.quit()
889
890
891    def noop(self, text):
892        logging.info("job: noop: " + text)
893
894
895    @_run_test_complete_on_exit
896    def parallel(self, *tasklist):
897        """Run tasks in parallel"""
898
899        pids = []
900        old_log_filename = self._logger.global_filename
901        for i, task in enumerate(tasklist):
902            assert isinstance(task, (tuple, list))
903            self._logger.global_filename = old_log_filename + (".%d" % i)
904            def task_func():
905                # stub out _record_indent with a process-local one
906                base_record_indent = self._record_indent
907                proc_local = self._job_state.property_factory(
908                    '_state', '_record_indent.%d' % os.getpid(),
909                    base_record_indent, namespace='client')
910                self.__class__._record_indent = proc_local
911                task[0](*task[1:])
912            pids.append(parallel.fork_start(self.resultdir, task_func))
913
914        old_log_path = os.path.join(self.resultdir, old_log_filename)
915        old_log = open(old_log_path, "a")
916        exceptions = []
917        for i, pid in enumerate(pids):
918            # wait for the task to finish
919            try:
920                parallel.fork_waitfor(self.resultdir, pid)
921            except Exception, e:
922                exceptions.append(e)
923            # copy the logs from the subtask into the main log
924            new_log_path = old_log_path + (".%d" % i)
925            if os.path.exists(new_log_path):
926                new_log = open(new_log_path)
927                old_log.write(new_log.read())
928                new_log.close()
929                old_log.flush()
930                os.remove(new_log_path)
931        old_log.close()
932
933        self._logger.global_filename = old_log_filename
934
935        # handle any exceptions raised by the parallel tasks
936        if exceptions:
937            msg = "%d task(s) failed in job.parallel" % len(exceptions)
938            raise error.JobError(msg)
939
940
941    def quit(self):
942        # XXX: should have a better name.
943        self.harness.run_pause()
944        raise error.JobContinue("more to come")
945
946
947    def complete(self, status):
948        """Write pending TAP reports, clean up, and exit"""
949        # write out TAP reports
950        if self._tap.do_tap_report:
951            self._tap.write()
952            self._tap._write_tap_archive()
953
954        # write out a job HTML report
955        try:
956            html_report.create_report(self.resultdir)
957        except Exception, e:
958            logging.error("Error writing job HTML report: %s", e)
959
960        # We are about to exit 'complete' so clean up the control file.
961        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
962        shutil.move(self._state_file, dest)
963
964        self.harness.run_complete()
965        self.disable_external_logging()
966        sys.exit(status)
967
968
969    def _load_state(self):
970        # grab any initial state and set up $CONTROL.state as the backing file
971        init_state_file = self.control + '.init.state'
972        self._state_file = self.control + '.state'
973        if os.path.exists(init_state_file):
974            shutil.move(init_state_file, self._state_file)
975        self._state.set_backing_file(self._state_file)
976
977        # initialize the state engine, if necessary
978        has_steps = self._state.has('client', 'steps')
979        if not self._is_continuation and has_steps:
980            raise RuntimeError('Loaded state can only contain client.steps if '
981                               'this is a continuation')
982
983        if not has_steps:
984            logging.info('Initializing the state engine')
985            self._state.set('client', 'steps', [])
986
987
988    def handle_persistent_option(self, options, option_name):
989        """
990        Select option from command line or persistent state.
991        Store selected option to allow standalone client to continue
992        after reboot with previously selected options.
993        Priority:
994        1. explicitly specified via command line
995        2. stored in state file (if continuing job '-c')
996        3. default == None
997        """
998        option = None
999        cmd_line_option = getattr(options, option_name)
1000        if cmd_line_option:
1001            option = cmd_line_option
1002            self._state.set('client', option_name, option)
1003        else:
1004            stored_option = self._state.get('client', option_name, None)
1005            if stored_option:
1006                option = stored_option
1007        logging.debug('Persistent option %s now set to %s', option_name, option)
1008        return option
1009
1010
1011    def __create_step_tuple(self, fn, args, dargs):
1012        # Legacy code passes in an array where the first arg is
1013        # the function or its name.
1014        if isinstance(fn, list):
1015            assert(len(args) == 0)
1016            assert(len(dargs) == 0)
1017            args = fn[1:]
1018            fn = fn[0]
1019        # Pickling actual functions is hairy, thus we have to call
1020        # them by name.  Unfortunately, this means only functions
1021        # defined globally can be used as a next step.
1022        if callable(fn):
1023            fn = fn.__name__
1024        if not isinstance(fn, types.StringTypes):
1025            raise StepError("Next steps must be functions or "
1026                            "strings containing the function name")
1027        ancestry = copy.copy(self._current_step_ancestry)
1028        return (ancestry, fn, args, dargs)
1029
1030
1031    def next_step_append(self, fn, *args, **dargs):
1032        """Define the next step and place it at the end"""
1033        steps = self._state.get('client', 'steps')
1034        steps.append(self.__create_step_tuple(fn, args, dargs))
1035        self._state.set('client', 'steps', steps)
1036
1037
1038    def next_step(self, fn, *args, **dargs):
1039        """Create a new step and place it after any steps added
1040        while running the current step but before any steps added in
1041        previous steps"""
1042        steps = self._state.get('client', 'steps')
1043        steps.insert(self._next_step_index,
1044                     self.__create_step_tuple(fn, args, dargs))
1045        self._next_step_index += 1
1046        self._state.set('client', 'steps', steps)
1047
1048
1049    def next_step_prepend(self, fn, *args, **dargs):
1050        """Insert a new step, executing first"""
1051        steps = self._state.get('client', 'steps')
1052        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
1053        self._next_step_index += 1
1054        self._state.set('client', 'steps', steps)
1055
1056
1057
1058    def _run_step_fn(self, local_vars, fn, args, dargs):
1059        """Run a (step) function within the given context"""
1060
1061        local_vars['__args'] = args
1062        local_vars['__dargs'] = dargs
1063        try:
1064            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
1065            return local_vars['__ret']
1066        except SystemExit:
1067            raise  # Send error.JobContinue and JobComplete on up to runjob.
1068        except error.TestNAError, detail:
1069            self.record(detail.exit_status, None, fn, str(detail))
1070        except Exception, detail:
1071            raise error.UnhandledJobError(detail)
1072
1073
1074    def _create_frame(self, global_vars, ancestry, fn_name):
1075        """Set up the environment like it would have been when this
1076        function was first defined.
1077
1078        Child step engine 'implementations' must have 'return locals()'
1079        at end end of their steps.  Because of this, we can call the
1080        parent function and get back all child functions (i.e. those
1081        defined within it).
1082
1083        Unfortunately, the call stack of the function calling
1084        job.next_step might have been deeper than the function it
1085        added.  In order to make sure that the environment is what it
1086        should be, we need to then pop off the frames we built until
1087        we find the frame where the function was first defined."""
1088
1089        # The copies ensure that the parent frames are not modified
1090        # while building child frames.  This matters if we then
1091        # pop some frames in the next part of this function.
1092        current_frame = copy.copy(global_vars)
1093        frames = [current_frame]
1094        for steps_fn_name in ancestry:
1095            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1096            current_frame = copy.copy(ret)
1097            frames.append(current_frame)
1098
1099        # Walk up the stack frames until we find the place fn_name was defined.
1100        while len(frames) > 2:
1101            if fn_name not in frames[-2]:
1102                break
1103            if frames[-2][fn_name] != frames[-1][fn_name]:
1104                break
1105            frames.pop()
1106            ancestry.pop()
1107
1108        return (frames[-1], ancestry)
1109
1110
1111    def _add_step_init(self, local_vars, current_function):
1112        """If the function returned a dictionary that includes a
1113        function named 'step_init', prepend it to our list of steps.
1114        This will only get run the first time a function with a nested
1115        use of the step engine is run."""
1116
1117        if (isinstance(local_vars, dict) and
1118            'step_init' in local_vars and
1119            callable(local_vars['step_init'])):
1120            # The init step is a child of the function
1121            # we were just running.
1122            self._current_step_ancestry.append(current_function)
1123            self.next_step_prepend('step_init')
1124
1125
1126    def step_engine(self):
1127        """The multi-run engine used when the control file defines step_init.
1128
1129        Does the next step.
1130        """
1131
1132        # Set up the environment and then interpret the control file.
1133        # Some control files will have code outside of functions,
1134        # which means we need to have our state engine initialized
1135        # before reading in the file.
1136        global_control_vars = {'job': self,
1137                               'args': self.args}
1138        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1139        try:
1140            execfile(self.control, global_control_vars, global_control_vars)
1141        except error.TestNAError, detail:
1142            self.record(detail.exit_status, None, self.control, str(detail))
1143        except SystemExit:
1144            raise  # Send error.JobContinue and JobComplete on up to runjob.
1145        except Exception, detail:
1146            # Syntax errors or other general Python exceptions coming out of
1147            # the top level of the control file itself go through here.
1148            raise error.UnhandledJobError(detail)
1149
1150        # If we loaded in a mid-job state file, then we presumably
1151        # know what steps we have yet to run.
1152        if not self._is_continuation:
1153            if 'step_init' in global_control_vars:
1154                self.next_step(global_control_vars['step_init'])
1155        else:
1156            # if last job failed due to unexpected reboot, record it as fail
1157            # so harness gets called
1158            last_job = self._state.get('client', 'unexpected_reboot', None)
1159            if last_job:
1160                subdir, testname = last_job
1161                self.record('FAIL', subdir, testname, 'unexpected reboot')
1162                self.record('END FAIL', subdir, testname)
1163
1164        # Iterate through the steps.  If we reboot, we'll simply
1165        # continue iterating on the next step.
1166        while len(self._state.get('client', 'steps')) > 0:
1167            steps = self._state.get('client', 'steps')
1168            (ancestry, fn_name, args, dargs) = steps.pop(0)
1169            self._state.set('client', 'steps', steps)
1170
1171            self._next_step_index = 0
1172            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1173            local_vars, self._current_step_ancestry = ret
1174            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1175            self._add_step_init(local_vars, fn_name)
1176
1177
1178    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1179        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1180                                   on_every_test)
1181
1182
1183    def add_sysinfo_logfile(self, file, on_every_test=False):
1184        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1185
1186
1187    def _add_sysinfo_loggable(self, loggable, on_every_test):
1188        if on_every_test:
1189            self.sysinfo.test_loggables.add(loggable)
1190        else:
1191            self.sysinfo.boot_loggables.add(loggable)
1192        self._save_sysinfo_state()
1193
1194
1195    def _load_sysinfo_state(self):
1196        state = self._state.get('client', 'sysinfo', None)
1197        if state:
1198            self.sysinfo.deserialize(state)
1199
1200
1201    def _save_sysinfo_state(self):
1202        state = self.sysinfo.serialize()
1203        self._state.set('client', 'sysinfo', state)
1204
1205
1206class disk_usage_monitor:
1207    def __init__(self, logging_func, device, max_mb_per_hour):
1208        self.func = logging_func
1209        self.device = device
1210        self.max_mb_per_hour = max_mb_per_hour
1211
1212
1213    def start(self):
1214        self.initial_space = utils.freespace(self.device)
1215        self.start_time = time.time()
1216
1217
1218    def stop(self):
1219        # if no maximum usage rate was set, we don't need to
1220        # generate any warnings
1221        if not self.max_mb_per_hour:
1222            return
1223
1224        final_space = utils.freespace(self.device)
1225        used_space = self.initial_space - final_space
1226        stop_time = time.time()
1227        total_time = stop_time - self.start_time
1228        # round up the time to one minute, to keep extremely short
1229        # tests from generating false positives due to short, badly
1230        # timed bursts of activity
1231        total_time = max(total_time, 60.0)
1232
1233        # determine the usage rate
1234        bytes_per_sec = used_space / total_time
1235        mb_per_sec = bytes_per_sec / 1024**2
1236        mb_per_hour = mb_per_sec * 60 * 60
1237
1238        if mb_per_hour > self.max_mb_per_hour:
1239            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1240            msg %= (self.device, mb_per_hour)
1241            self.func(msg)
1242
1243
1244    @classmethod
1245    def watch(cls, *monitor_args, **monitor_dargs):
1246        """ Generic decorator to wrap a function call with the
1247        standard create-monitor -> start -> call -> stop idiom."""
1248        def decorator(func):
1249            def watched_func(*args, **dargs):
1250                monitor = cls(*monitor_args, **monitor_dargs)
1251                monitor.start()
1252                try:
1253                    func(*args, **dargs)
1254                finally:
1255                    monitor.stop()
1256            return watched_func
1257        return decorator
1258
1259
1260def runjob(control, drop_caches, options):
1261    """
1262    Run a job using the given control file.
1263
1264    This is the main interface to this module.
1265
1266    @see base_job.__init__ for parameter info.
1267    """
1268    control = os.path.abspath(control)
1269    state = control + '.state'
1270    # Ensure state file is cleaned up before the job starts to run if autotest
1271    # is not running with the --continue flag
1272    if not options.cont and os.path.isfile(state):
1273        logging.debug('Cleaning up previously found state file')
1274        os.remove(state)
1275
1276    # instantiate the job object ready for the control file.
1277    myjob = None
1278    try:
1279        # Check that the control file is valid
1280        if not os.path.exists(control):
1281            raise error.JobError(control + ": control file not found")
1282
1283        # When continuing, the job is complete when there is no
1284        # state file, ensure we don't try and continue.
1285        if options.cont and not os.path.exists(state):
1286            raise error.JobComplete("all done")
1287
1288        myjob = job(control=control, drop_caches=drop_caches, options=options)
1289
1290        # Load in the users control file, may do any one of:
1291        #  1) execute in toto
1292        #  2) define steps, and select the first via next_step()
1293        myjob.step_engine()
1294
1295    except error.JobContinue:
1296        sys.exit(5)
1297
1298    except error.JobComplete:
1299        sys.exit(1)
1300
1301    except error.JobError, instance:
1302        logging.error("JOB ERROR: " + str(instance))
1303        if myjob:
1304            command = None
1305            if len(instance.args) > 1:
1306                command = instance.args[1]
1307                myjob.record('ABORT', None, command, str(instance))
1308            myjob.record('END ABORT', None, None, str(instance))
1309            assert myjob._record_indent == 0
1310            myjob.complete(1)
1311        else:
1312            sys.exit(1)
1313
1314    except Exception, e:
1315        # NOTE: job._run_step_fn and job.step_engine will turn things into
1316        # a JobError for us.  If we get here, its likely an autotest bug.
1317        msg = str(e) + '\n' + traceback.format_exc()
1318        logging.critical("JOB ERROR (autotest bug?): " + msg)
1319        if myjob:
1320            myjob.record('END ABORT', None, None, msg)
1321            assert myjob._record_indent == 0
1322            myjob.complete(1)
1323        else:
1324            sys.exit(1)
1325
1326    # If we get here, then we assume the job is complete and good.
1327    myjob.record('END GOOD', None, None)
1328    assert myjob._record_indent == 0
1329
1330    myjob.complete(0)
1331
1332
1333site_job = utils.import_site_class(
1334    __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
1335
1336class job(site_job):
1337    pass
1338