job.py revision 8904d6f41c99d44b01bcd60eab70f2ac5ef25337
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9import logging
10import cPickle as pickle
11from autotest_lib.client.bin import client_logging_config
12from autotest_lib.client.bin import utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, test, local_host
15from autotest_lib.client.bin import partition as partition_lib
16from autotest_lib.client.common_lib import error, barrier, log, logging_manager
17from autotest_lib.client.common_lib import base_packages, packages
18
19LAST_BOOT_TAG = object()
20NO_DEFAULT = object()
21JOB_PREAMBLE = """
22from autotest_lib.client.common_lib.error import *
23from autotest_lib.client.bin.utils import *
24"""
25
26
27class StepError(error.AutotestError):
28    pass
29
30class NotAvailableError(error.AutotestError):
31    pass
32
33
34
35def _run_test_complete_on_exit(f):
36    """Decorator for job methods that automatically calls
37    self.harness.run_test_complete when the method exits, if appropriate."""
38    def wrapped(self, *args, **dargs):
39        try:
40            return f(self, *args, **dargs)
41        finally:
42            if self.log_filename == self.DEFAULT_LOG_FILENAME:
43                self.harness.run_test_complete()
44                if self.drop_caches:
45                    logging.debug("Dropping caches")
46                    utils.drop_caches()
47    wrapped.__name__ = f.__name__
48    wrapped.__doc__ = f.__doc__
49    wrapped.__dict__.update(f.__dict__)
50    return wrapped
51
52
53class base_job(object):
54    """The actual job against which we do everything.
55
56    Properties:
57            autodir
58                    The top level autotest directory (/usr/local/autotest).
59                    Comes from os.environ['AUTODIR'].
60            bindir
61                    <autodir>/bin/
62            libdir
63                    <autodir>/lib/
64            testdir
65                    <autodir>/tests/
66            configdir
67                    <autodir>/config/
68            site_testdir
69                    <autodir>/site_tests/
70            profdir
71                    <autodir>/profilers/
72            tmpdir
73                    <autodir>/tmp/
74            pkgdir
75                    <autodir>/packages/
76            resultdir
77                    <autodir>/results/<jobtag>
78            toolsdir
79                    <autodir>/tools/
80            logging
81                    logging_manager.LoggingManager object
82            profilers
83                    the profilers object for this job
84            harness
85                    the server harness object for this job
86            config
87                    the job configuration for this job
88            drop_caches_between_iterations
89                    drop the pagecache between each iteration
90            default_profile_only
91                    default value for the test.execute profile_only paramete
92    """
93
94    DEFAULT_LOG_FILENAME = "status"
95    WARNING_DISABLE_DELAY = 5
96
97    def __init__(self, control, options, drop_caches=True,
98                 extra_copy_cmdline=None):
99        """
100        Prepare a client side job object.
101
102        @param control: The control file (pathname of).
103        @param options: an object which includes:
104                jobtag: The job tag string (eg "default").
105                cont: If this is the continuation of this job.
106                harness_type: An alternative server harness.  [None]
107                use_external_logging: If true, the enable_external_logging
108                          method will be called during construction.  [False]
109        @param drop_caches: If true, utils.drop_caches() is called before and
110                between all tests.  [True]
111        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
112                copy from the running kernel to all the installed kernels with
113                this job
114        """
115        self._pre_record_init(control, options)
116        try:
117            self._post_record_init(control, options, drop_caches,
118                                   extra_copy_cmdline)
119        except Exception, err:
120            self.record(
121                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
122                    str(err))
123            raise
124
125
126    def _pre_record_init(self, control, options):
127        """
128        Initialization function that should peform ONLY the required
129        setup so that the self.record() method works.
130
131        As of now self.record() needs self.resultdir, self.group_level,
132        self.log_filename, self.harness.
133        """
134        self.autodir = os.environ['AUTODIR']
135        self.resultdir = os.path.join(self.autodir, 'results', options.tag)
136        self.tmpdir = os.path.join(self.autodir, 'tmp')
137
138        if not os.path.exists(self.resultdir):
139            os.makedirs(self.resultdir)
140
141        if not options.cont:
142            self._cleanup_results_dir()
143            # Don't cleanup the tmp dir (which contains the lockfile)
144            # in the constructor, this would be a problem for multiple
145            # jobs starting at the same time on the same client. Instead
146            # do the delete at the server side. We simply create the tmp
147            # directory here if it does not already exist.
148            if not os.path.exists(self.tmpdir):
149                os.mkdir(self.tmpdir)
150
151        logging_manager.configure_logging(
152                client_logging_config.ClientLoggingConfig(),
153                results_dir=self.resultdir,
154                verbose=options.verbose)
155        logging.info('Writing results to %s', self.resultdir)
156
157        self.log_filename = self.DEFAULT_LOG_FILENAME
158
159        # init_group_level needs the state
160        self.control = os.path.realpath(control)
161        self._is_continuation = options.cont
162        self.state_file = self.control + '.state'
163        self.current_step_ancestry = []
164        self.next_step_index = 0
165        self.testtag = ''
166        self._test_tag_prefix = ''
167        self._load_state()
168
169        self._init_group_level()
170
171        self.harness = harness.select(options.harness, self)
172
173
174    def _post_record_init(self, control, options, drop_caches,
175                          extra_copy_cmdline):
176        """
177        Perform job initialization not required by self.record().
178        """
179        self.bindir = os.path.join(self.autodir, 'bin')
180        self.libdir = os.path.join(self.autodir, 'lib')
181        self.testdir = os.path.join(self.autodir, 'tests')
182        self.configdir = os.path.join(self.autodir, 'config')
183        self.site_testdir = os.path.join(self.autodir, 'site_tests')
184        self.profdir = os.path.join(self.autodir, 'profilers')
185        self.toolsdir = os.path.join(self.autodir, 'tools')
186
187        self._init_drop_caches(drop_caches)
188
189        self._init_packages()
190        self.default_profile_only = self.get_state("__default_profile_only",
191                                                   default=False)
192        self.run_test_cleanup = self.get_state("__run_test_cleanup",
193                                                default=True)
194
195        self.sysinfo = sysinfo.sysinfo(self.resultdir)
196        self._load_sysinfo_state()
197
198        self.last_boot_tag = self.get_state("__last_boot_tag", default=None)
199        self.tag = self.get_state("__job_tag", default=None)
200
201        if not options.cont:
202            if not os.path.exists(self.pkgdir):
203                os.mkdir(self.pkgdir)
204
205            results = os.path.join(self.autodir, 'results')
206            if not os.path.exists(results):
207                os.mkdir(results)
208
209            download = os.path.join(self.testdir, 'download')
210            if not os.path.exists(download):
211                os.mkdir(download)
212
213            os.makedirs(os.path.join(self.resultdir, 'analysis'))
214
215            shutil.copyfile(self.control,
216                            os.path.join(self.resultdir, 'control'))
217
218
219        self.control = control
220        self.jobtag = options.tag
221
222        self.logging = logging_manager.get_logging_manager(
223                manage_stdout_and_stderr=True, redirect_fds=True)
224        self.logging.start_logging()
225
226        self.config = config.config(self)
227        self.profilers = profilers.profilers(self)
228        self.host = local_host.LocalHost(hostname=options.hostname)
229        self.autoserv_user = options.autoserv_user
230
231        self._init_bootloader()
232
233        self.sysinfo.log_per_reboot_data()
234
235        if not options.cont:
236            self.record('START', None, None)
237            self._increment_group_level()
238
239        self.harness.run_start()
240
241        if options.log:
242            self.enable_external_logging()
243
244        # load the max disk usage rate - default to no monitoring
245        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
246
247        self._init_cmdline(extra_copy_cmdline)
248
249
250    def _init_drop_caches(self, drop_caches):
251        """
252        Perform the drop caches initialization.
253        """
254        self.drop_caches_between_iterations = True
255        self.drop_caches = drop_caches
256        if self.drop_caches:
257            logging.debug("Dropping caches")
258            utils.drop_caches()
259
260
261    def _init_bootloader(self):
262        """
263        Perform boottool initialization.
264        """
265        try:
266            tool = self.config_get('boottool.executable')
267            self.bootloader = boottool.boottool(tool)
268        except:
269            pass
270
271
272    def _init_packages(self):
273        """
274        Perform the packages support initialization.
275        """
276        self.pkgmgr = packages.PackageManager(
277            self.autodir, run_function_dargs={'timeout':3600})
278        self.pkgdir = os.path.join(self.autodir, 'packages')
279
280
281    def _init_cmdline(self, extra_copy_cmdline):
282        """
283        Initialize default cmdline for booted kernels in this job.
284        """
285        copy_cmdline = set(['console'])
286        if extra_copy_cmdline is not None:
287            copy_cmdline.update(extra_copy_cmdline)
288
289        # extract console= and other args from cmdline and add them into the
290        # base args that we use for all kernels we install
291        cmdline = utils.read_one_line('/proc/cmdline')
292        kernel_args = []
293        for karg in cmdline.split():
294            for param in copy_cmdline:
295                if karg.startswith(param) and \
296                    (len(param) == len(karg) or karg[len(param)] == '='):
297                    kernel_args.append(karg)
298        self.config_set('boot.default_args', ' '.join(kernel_args))
299
300
301    def _cleanup_results_dir(self):
302        """Delete everything in resultsdir"""
303        assert os.path.exists(self.resultdir)
304        list_files = glob.glob('%s/*' % self.resultdir)
305        for f in list_files:
306            if os.path.isdir(f):
307                shutil.rmtree(f)
308            elif os.path.isfile(f):
309                os.remove(f)
310
311
312    def disable_warnings(self, warning_type):
313        self.record("INFO", None, None,
314                    "disabling %s warnings" % warning_type,
315                    {"warnings.disable": warning_type})
316        time.sleep(self.WARNING_DISABLE_DELAY)
317
318
319    def enable_warnings(self, warning_type):
320        time.sleep(self.WARNING_DISABLE_DELAY)
321        self.record("INFO", None, None,
322                    "enabling %s warnings" % warning_type,
323                    {"warnings.enable": warning_type})
324
325
326    def monitor_disk_usage(self, max_rate):
327        """\
328        Signal that the job should monitor disk space usage on /
329        and generate a warning if a test uses up disk space at a
330        rate exceeding 'max_rate'.
331
332        Parameters:
333             max_rate - the maximium allowed rate of disk consumption
334                        during a test, in MB/hour, or 0 to indicate
335                        no limit.
336        """
337        self.set_state('__monitor_disk', max_rate)
338        self.max_disk_usage_rate = max_rate
339
340
341    def relative_path(self, path):
342        """\
343        Return a patch relative to the job results directory
344        """
345        head = len(self.resultdir) + 1     # remove the / inbetween
346        return path[head:]
347
348
349    def control_get(self):
350        return self.control
351
352
353    def control_set(self, control):
354        self.control = os.path.abspath(control)
355
356
357    def harness_select(self, which):
358        self.harness = harness.select(which, self)
359
360
361    def config_set(self, name, value):
362        self.config.set(name, value)
363
364
365    def config_get(self, name):
366        return self.config.get(name)
367
368
369    def setup_dirs(self, results_dir, tmp_dir):
370        if not tmp_dir:
371            tmp_dir = os.path.join(self.tmpdir, 'build')
372        if not os.path.exists(tmp_dir):
373            os.mkdir(tmp_dir)
374        if not os.path.isdir(tmp_dir):
375            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
376            raise ValueError(e_msg)
377
378        # We label the first build "build" and then subsequent ones
379        # as "build.2", "build.3", etc. Whilst this is a little bit
380        # inconsistent, 99.9% of jobs will only have one build
381        # (that's not done as kernbench, sparse, or buildtest),
382        # so it works out much cleaner. One of life's comprimises.
383        if not results_dir:
384            results_dir = os.path.join(self.resultdir, 'build')
385            i = 2
386            while os.path.exists(results_dir):
387                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
388                i += 1
389        if not os.path.exists(results_dir):
390            os.mkdir(results_dir)
391
392        return (results_dir, tmp_dir)
393
394
395    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
396                            kjob = None ):
397        """Summon a xen object"""
398        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
399        build_dir = 'xen'
400        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
401                       leave, kjob)
402
403
404    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
405        """Summon a kernel object"""
406        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
407        build_dir = 'linux'
408        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
409                                  build_dir, leave)
410
411
412    def barrier(self, *args, **kwds):
413        """Create a barrier object"""
414        return barrier.barrier(*args, **kwds)
415
416
417    def install_pkg(self, name, pkg_type, install_dir):
418        '''
419        This method is a simple wrapper around the actual package
420        installation method in the Packager class. This is used
421        internally by the profilers, deps and tests code.
422        name : name of the package (ex: sleeptest, dbench etc.)
423        pkg_type : Type of the package (ex: test, dep etc.)
424        install_dir : The directory in which the source is actually
425                      untarred into. (ex: client/profilers/<name> for profilers)
426        '''
427        if self.pkgmgr.repositories:
428            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
429
430
431    def add_repository(self, repo_urls):
432        '''
433        Adds the repository locations to the job so that packages
434        can be fetched from them when needed. The repository list
435        needs to be a string list
436        Ex: job.add_repository(['http://blah1','http://blah2'])
437        '''
438        for repo_url in repo_urls:
439            self.pkgmgr.add_repository(repo_url)
440
441        # Fetch the packages' checksum file that contains the checksums
442        # of all the packages if it is not already fetched. The checksum
443        # is always fetched whenever a job is first started. This
444        # is not done in the job's constructor as we don't have the list of
445        # the repositories there (and obviously don't care about this file
446        # if we are not using the repos)
447        try:
448            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
449                                              base_packages.CHECKSUM_FILE)
450            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
451                                  checksum_file_path, use_checksum=False)
452        except error.PackageFetchError:
453            # packaging system might not be working in this case
454            # Silently fall back to the normal case
455            pass
456
457
458    def require_gcc(self):
459        """
460        Test whether gcc is installed on the machine.
461        """
462        # check if gcc is installed on the system.
463        try:
464            utils.system('which gcc')
465        except error.CmdError, e:
466            raise NotAvailableError('gcc is required by this job and is '
467                                    'not available on the system')
468
469
470    def setup_dep(self, deps):
471        """Set up the dependencies for this test.
472        deps is a list of libraries required for this test.
473        """
474        # Fetch the deps from the repositories and set them up.
475        for dep in deps:
476            dep_dir = os.path.join(self.autodir, 'deps', dep)
477            # Search for the dependency in the repositories if specified,
478            # else check locally.
479            try:
480                self.install_pkg(dep, 'dep', dep_dir)
481            except error.PackageInstallError:
482                # see if the dep is there locally
483                pass
484
485            # dep_dir might not exist if it is not fetched from the repos
486            if not os.path.exists(dep_dir):
487                raise error.TestError("Dependency %s does not exist" % dep)
488
489            os.chdir(dep_dir)
490            utils.system('./' + dep + '.py')
491
492
493    def _runtest(self, url, tag, args, dargs):
494        try:
495            l = lambda : test.runtest(self, url, tag, args, dargs)
496            pid = parallel.fork_start(self.resultdir, l)
497            parallel.fork_waitfor(self.resultdir, pid)
498        except error.TestBaseException:
499            # These are already classified with an error type (exit_status)
500            raise
501        except error.JobError:
502            raise  # Caught further up and turned into an ABORT.
503        except Exception, e:
504            # Converts all other exceptions thrown by the test regardless
505            # of phase into a TestError(TestBaseException) subclass that
506            # reports them with their full stack trace.
507            raise error.UnhandledTestError(e)
508
509
510    @_run_test_complete_on_exit
511    def run_test(self, url, *args, **dargs):
512        """Summon a test object and run it.
513
514        tag
515                tag to add to testname
516        url
517                url of the test to run
518        """
519
520        if not url:
521            raise TypeError("Test name is invalid. "
522                            "Switched arguments?")
523        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
524        namelen = len(testname)
525        dargs = dargs.copy()
526        tntag = dargs.pop('tag', None)
527        if tntag:         # per-test tag  is included in reported test name
528            testname += '.' + str(tntag)
529        run_number = self.get_run_number()
530        if run_number:
531            testname += '._%02d_' % run_number
532            self.set_run_number(run_number + 1)
533        if self._is_kernel_in_test_tag():
534            testname += '.' + platform.release()
535        if self._test_tag_prefix:
536            testname += '.' + self._test_tag_prefix
537        if self.testtag:  # job-level tag is included in reported test name
538            testname += '.' + self.testtag
539        subdir = testname
540        sdtag = dargs.pop('subdir_tag', None)
541        if sdtag:  # subdir-only tag is not included in reports
542            subdir = subdir + '.' + str(sdtag)
543        tag = subdir[namelen+1:]    # '' if none
544
545        outputdir = os.path.join(self.resultdir, subdir)
546        if os.path.exists(outputdir):
547            msg = ("%s already exists, test <%s> may have"
548                    " already run with tag <%s>"
549                    % (outputdir, testname, tag) )
550            raise error.TestError(msg)
551        # NOTE: client/common_lib/test.py runtest() depends directory names
552        # being constructed the same way as in this code.
553        os.mkdir(outputdir)
554
555        def log_warning(reason):
556            self.record("WARN", subdir, testname, reason)
557        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
558        def group_func():
559            try:
560                self._runtest(url, tag, args, dargs)
561            except error.TestBaseException, detail:
562                # The error is already classified, record it properly.
563                self.record(detail.exit_status, subdir, testname, str(detail))
564                raise
565            else:
566                self.record('GOOD', subdir, testname, 'completed successfully')
567
568        try:
569            self._rungroup(subdir, testname, group_func)
570            return True
571        except error.TestBaseException:
572            return False
573        # Any other exception here will be given to the caller
574        #
575        # NOTE: The only exception possible from the control file here
576        # is error.JobError as _runtest() turns all others into an
577        # UnhandledTestError that is caught above.
578
579
580    def _rungroup(self, subdir, testname, function, *args, **dargs):
581        """\
582        subdir:
583                name of the group
584        testname:
585                name of the test to run, or support step
586        function:
587                subroutine to run
588        *args:
589                arguments for the function
590
591        Returns the result of the passed in function
592        """
593
594        try:
595            self.record('START', subdir, testname)
596            self._increment_group_level()
597            result = function(*args, **dargs)
598            self._decrement_group_level()
599            self.record('END GOOD', subdir, testname)
600            return result
601        except error.TestBaseException, e:
602            self._decrement_group_level()
603            self.record('END %s' % e.exit_status, subdir, testname)
604            raise
605        except error.JobError, e:
606            self._decrement_group_level()
607            self.record('END ABORT', subdir, testname)
608            raise
609        except Exception, e:
610            # This should only ever happen due to a bug in the given
611            # function's code.  The common case of being called by
612            # run_test() will never reach this.  If a control file called
613            # run_group() itself, bugs in its function will be caught
614            # here.
615            self._decrement_group_level()
616            err_msg = str(e) + '\n' + traceback.format_exc()
617            self.record('END ERROR', subdir, testname, err_msg)
618            raise
619
620
621    def run_group(self, function, tag=None, **dargs):
622        """
623        Run a function nested within a group level.
624
625        function:
626                Callable to run.
627        tag:
628                An optional tag name for the group.  If None (default)
629                function.__name__ will be used.
630        **dargs:
631                Named arguments for the function.
632        """
633        if tag:
634            name = tag
635        else:
636            name = function.__name__
637
638        try:
639            return self._rungroup(subdir=None, testname=name,
640                                  function=function, **dargs)
641        except (SystemExit, error.TestBaseException):
642            raise
643        # If there was a different exception, turn it into a TestError.
644        # It will be caught by step_engine or _run_step_fn.
645        except Exception, e:
646            raise error.UnhandledTestError(e)
647
648
649    _RUN_NUMBER_STATE = '__run_number'
650    def get_run_number(self):
651        """Get the run number or 0 if no run number has been set."""
652        return self.get_state(self._RUN_NUMBER_STATE, default=0)
653
654
655    def set_run_number(self, number):
656        """If the run number is non-zero it will be in the output dir name."""
657        self.set_state(self._RUN_NUMBER_STATE, number)
658
659
660    _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag'
661    def _is_kernel_in_test_tag(self):
662        """Boolean: should the kernel version be included in the test tag."""
663        return self.get_state(self._KERNEL_IN_TAG_STATE, default=False)
664
665
666    def show_kernel_in_test_tag(self, value=True):
667        """If True, the kernel version at test time will prefix test tags."""
668        self.set_state(self._KERNEL_IN_TAG_STATE, value)
669
670
671    def set_test_tag(self, tag=''):
672        """Set tag to be added to test name of all following run_test steps."""
673        self.testtag = tag
674
675
676    def set_test_tag_prefix(self, prefix):
677        """Set a prefix string to prepend to all future run_test steps.
678
679        Args:
680          prefix: A string to prepend to any run_test() step tags separated by
681              a '.'; use the empty string '' to clear it.
682        """
683        self._test_tag_prefix = prefix
684
685
686    def cpu_count(self):
687        return utils.count_cpus()  # use total system count
688
689
690    def start_reboot(self):
691        self.record('START', None, 'reboot')
692        self._increment_group_level()
693        self.record('GOOD', None, 'reboot.start')
694
695
696    def _record_reboot_failure(self, subdir, operation, status,
697                               running_id=None):
698        self.record("ABORT", subdir, operation, status)
699        self._decrement_group_level()
700        if not running_id:
701            running_id = utils.running_os_ident()
702        kernel = {"kernel": running_id.split("::")[0]}
703        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
704
705
706    def _check_post_reboot(self, subdir, running_id=None):
707        """
708        Function to perform post boot checks such as if the mounted
709        partition list has changed across reboots.
710        """
711        partition_list = partition_lib.get_partition_list(self)
712        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
713        old_mount_info = self.get_state("__mount_info")
714        if mount_info != old_mount_info:
715            new_entries = mount_info - old_mount_info
716            old_entries = old_mount_info - mount_info
717            description = ("mounted partitions are different after reboot "
718                           "(old entries: %s, new entries: %s)" %
719                           (old_entries, new_entries))
720            self._record_reboot_failure(subdir, "reboot.verify_config",
721                                        description, running_id=running_id)
722            raise error.JobError("Reboot failed: %s" % description)
723
724
725    def end_reboot(self, subdir, kernel, patches, running_id=None):
726        self._check_post_reboot(subdir, running_id=running_id)
727
728        # strip ::<timestamp> from the kernel version if present
729        kernel = kernel.split("::")[0]
730        kernel_info = {"kernel": kernel}
731        for i, patch in enumerate(patches):
732            kernel_info["patch%d" % i] = patch
733        self._decrement_group_level()
734        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
735
736
737    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
738                              type='src', patches=[]):
739        """ Check the passed kernel identifier against the command line
740            and the running kernel, abort the job on missmatch. """
741
742        logging.info("POST BOOT: checking booted kernel "
743                     "mark=%d identity='%s' type='%s'",
744                     expected_when, expected_id, type)
745
746        running_id = utils.running_os_ident()
747
748        cmdline = utils.read_one_line("/proc/cmdline")
749
750        find_sum = re.compile(r'.*IDENT=(\d+)')
751        m = find_sum.match(cmdline)
752        cmdline_when = -1
753        if m:
754            cmdline_when = int(m.groups()[0])
755
756        # We have all the facts, see if they indicate we
757        # booted the requested kernel or not.
758        bad = False
759        if (type == 'src' and expected_id != running_id or
760            type == 'rpm' and
761            not running_id.startswith(expected_id + '::')):
762            logging.error("Kernel identifier mismatch")
763            bad = True
764        if expected_when != cmdline_when:
765            logging.error("Kernel command line mismatch")
766            bad = True
767
768        if bad:
769            logging.error("   Expected Ident: " + expected_id)
770            logging.error("    Running Ident: " + running_id)
771            logging.error("    Expected Mark: %d", expected_when)
772            logging.error("Command Line Mark: %d", cmdline_when)
773            logging.error("     Command Line: " + cmdline)
774
775            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
776                                        running_id=running_id)
777            raise error.JobError("Reboot returned with the wrong kernel")
778
779        self.record('GOOD', subdir, 'reboot.verify',
780                    utils.running_os_full_version())
781        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
782
783
784    def partition(self, device, loop_size=0, mountpoint=None):
785        """
786        Work with a machine partition
787
788            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
789            @param mountpoint: Specify a directory to mount to. If not specified
790                               autotest tmp directory will be used.
791            @param loop_size: Size of loopback device (in MB). Defaults to 0.
792
793            @return: A L{client.bin.partition.partition} object
794        """
795
796        if not mountpoint:
797            mountpoint = self.tmpdir
798        return partition_lib.partition(self, device, loop_size, mountpoint)
799
800    @utils.deprecated
801    def filesystem(self, device, mountpoint=None, loop_size=0):
802        """ Same as partition
803
804        @deprecated: Use partition method instead
805        """
806        return self.partition(device, loop_size, mountpoint)
807
808
809    def enable_external_logging(self):
810        pass
811
812
813    def disable_external_logging(self):
814        pass
815
816
817    def set_default_profile_only(self, val):
818        """ Set the default_profile_only mode. """
819        self.set_state("__default_profile_only", val)
820        self.default_profile_only = val
821
822
823    def enable_test_cleanup(self):
824        """ By default tests run test.cleanup """
825        self.set_state("__run_test_cleanup", True)
826        self.run_test_cleanup = True
827
828
829    def disable_test_cleanup(self):
830        """ By default tests do not run test.cleanup """
831        self.set_state("__run_test_cleanup", False)
832        self.run_test_cleanup = False
833
834
835    def default_test_cleanup(self, val):
836        if not self._is_continuation:
837            self.set_state("__run_test_cleanup", val)
838            self.run_test_cleanup = val
839
840
841    def default_boot_tag(self, tag):
842        if not self._is_continuation:
843            self.set_state("__last_boot_tag", tag)
844            self.last_boot_tag = tag
845
846
847    def default_tag(self, tag):
848        """Allows the scheduler's job tag to be passed in from autoserv."""
849        if not self._is_continuation:
850            self.set_state("__job_tag", tag)
851            self.tag = tag
852
853
854    def reboot_setup(self):
855        # save the partition list and their mount point and compare it
856        # after reboot
857        partition_list = partition_lib.get_partition_list(self)
858        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
859        self.set_state("__mount_info", mount_info)
860
861
862    def reboot(self, tag=LAST_BOOT_TAG):
863        if tag == LAST_BOOT_TAG:
864            tag = self.last_boot_tag
865        else:
866            self.set_state("__last_boot_tag", tag)
867            self.last_boot_tag = tag
868
869        self.reboot_setup()
870        self.harness.run_reboot()
871        default = self.config_get('boot.set_default')
872        if default:
873            self.bootloader.set_default(tag)
874        else:
875            self.bootloader.boot_once(tag)
876
877        # HACK: using this as a module sometimes hangs shutdown, so if it's
878        # installed unload it first
879        utils.system("modprobe -r netconsole", ignore_status=True)
880
881        # sync first, so that a sync during shutdown doesn't time out
882        utils.system("sync; sync", ignore_status=True)
883
884        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
885        self.quit()
886
887
888    def noop(self, text):
889        logging.info("job: noop: " + text)
890
891
892    @_run_test_complete_on_exit
893    def parallel(self, *tasklist):
894        """Run tasks in parallel"""
895
896        pids = []
897        old_log_filename = self.log_filename
898        for i, task in enumerate(tasklist):
899            assert isinstance(task, (tuple, list))
900            self.log_filename = old_log_filename + (".%d" % i)
901            task_func = lambda: task[0](*task[1:])
902            pids.append(parallel.fork_start(self.resultdir, task_func))
903
904        old_log_path = os.path.join(self.resultdir, old_log_filename)
905        old_log = open(old_log_path, "a")
906        exceptions = []
907        for i, pid in enumerate(pids):
908            # wait for the task to finish
909            try:
910                parallel.fork_waitfor(self.resultdir, pid)
911            except Exception, e:
912                exceptions.append(e)
913            # copy the logs from the subtask into the main log
914            new_log_path = old_log_path + (".%d" % i)
915            if os.path.exists(new_log_path):
916                new_log = open(new_log_path)
917                old_log.write(new_log.read())
918                new_log.close()
919                old_log.flush()
920                os.remove(new_log_path)
921        old_log.close()
922
923        self.log_filename = old_log_filename
924
925        # handle any exceptions raised by the parallel tasks
926        if exceptions:
927            msg = "%d task(s) failed in job.parallel" % len(exceptions)
928            raise error.JobError(msg)
929
930
931    def quit(self):
932        # XXX: should have a better name.
933        self.harness.run_pause()
934        raise error.JobContinue("more to come")
935
936
937    def complete(self, status):
938        """Clean up and exit"""
939        # We are about to exit 'complete' so clean up the control file.
940        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
941        shutil.move(self.state_file, dest)
942
943        self.harness.run_complete()
944        self.disable_external_logging()
945        sys.exit(status)
946
947
948    def set_state(self, var, val):
949        # Deep copies make sure that the state can't be altered
950        # without it being re-written.  Perf wise, deep copies
951        # are overshadowed by pickling/loading.
952        self.state[var] = copy.deepcopy(val)
953        outfile = open(self.state_file, 'wb')
954        try:
955            pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL)
956        finally:
957            outfile.close()
958        logging.debug("Persistent state variable %s now set to %r", var, val)
959
960
961    def _load_state(self):
962        if hasattr(self, 'state'):
963            raise RuntimeError('state already exists')
964        infile = None
965        try:
966            try:
967                infile = open(self.state_file, 'rb')
968                self.state = pickle.load(infile)
969                logging.info('Loaded state from %s', self.state_file)
970            except IOError:
971                self.state = {}
972                initialize = True
973        finally:
974            if infile:
975                infile.close()
976
977        initialize = '__steps' not in self.state
978        if not (self._is_continuation or initialize):
979            raise RuntimeError('Loaded state must contain __steps or be a '
980                               'continuation.')
981
982        if initialize:
983            logging.info('Initializing the state engine')
984            self.set_state('__steps', [])  # writes pickle file
985
986
987    def get_state(self, var, default=NO_DEFAULT):
988        if var in self.state or default == NO_DEFAULT:
989            val = self.state[var]
990        else:
991            val = default
992        return copy.deepcopy(val)
993
994
995    def __create_step_tuple(self, fn, args, dargs):
996        # Legacy code passes in an array where the first arg is
997        # the function or its name.
998        if isinstance(fn, list):
999            assert(len(args) == 0)
1000            assert(len(dargs) == 0)
1001            args = fn[1:]
1002            fn = fn[0]
1003        # Pickling actual functions is hairy, thus we have to call
1004        # them by name.  Unfortunately, this means only functions
1005        # defined globally can be used as a next step.
1006        if callable(fn):
1007            fn = fn.__name__
1008        if not isinstance(fn, types.StringTypes):
1009            raise StepError("Next steps must be functions or "
1010                            "strings containing the function name")
1011        ancestry = copy.copy(self.current_step_ancestry)
1012        return (ancestry, fn, args, dargs)
1013
1014
1015    def next_step_append(self, fn, *args, **dargs):
1016        """Define the next step and place it at the end"""
1017        steps = self.get_state('__steps')
1018        steps.append(self.__create_step_tuple(fn, args, dargs))
1019        self.set_state('__steps', steps)
1020
1021
1022    def next_step(self, fn, *args, **dargs):
1023        """Create a new step and place it after any steps added
1024        while running the current step but before any steps added in
1025        previous steps"""
1026        steps = self.get_state('__steps')
1027        steps.insert(self.next_step_index,
1028                     self.__create_step_tuple(fn, args, dargs))
1029        self.next_step_index += 1
1030        self.set_state('__steps', steps)
1031
1032
1033    def next_step_prepend(self, fn, *args, **dargs):
1034        """Insert a new step, executing first"""
1035        steps = self.get_state('__steps')
1036        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
1037        self.next_step_index += 1
1038        self.set_state('__steps', steps)
1039
1040
1041    def _run_step_fn(self, local_vars, fn, args, dargs):
1042        """Run a (step) function within the given context"""
1043
1044        local_vars['__args'] = args
1045        local_vars['__dargs'] = dargs
1046        try:
1047            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
1048            return local_vars['__ret']
1049        except SystemExit:
1050            raise  # Send error.JobContinue and JobComplete on up to runjob.
1051        except error.TestNAError, detail:
1052            self.record(detail.exit_status, None, fn, str(detail))
1053        except Exception, detail:
1054            raise error.UnhandledJobError(detail)
1055
1056
1057    def _create_frame(self, global_vars, ancestry, fn_name):
1058        """Set up the environment like it would have been when this
1059        function was first defined.
1060
1061        Child step engine 'implementations' must have 'return locals()'
1062        at end end of their steps.  Because of this, we can call the
1063        parent function and get back all child functions (i.e. those
1064        defined within it).
1065
1066        Unfortunately, the call stack of the function calling
1067        job.next_step might have been deeper than the function it
1068        added.  In order to make sure that the environment is what it
1069        should be, we need to then pop off the frames we built until
1070        we find the frame where the function was first defined."""
1071
1072        # The copies ensure that the parent frames are not modified
1073        # while building child frames.  This matters if we then
1074        # pop some frames in the next part of this function.
1075        current_frame = copy.copy(global_vars)
1076        frames = [current_frame]
1077        for steps_fn_name in ancestry:
1078            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1079            current_frame = copy.copy(ret)
1080            frames.append(current_frame)
1081
1082        # Walk up the stack frames until we find the place fn_name was defined.
1083        while len(frames) > 2:
1084            if fn_name not in frames[-2]:
1085                break
1086            if frames[-2][fn_name] != frames[-1][fn_name]:
1087                break
1088            frames.pop()
1089            ancestry.pop()
1090
1091        return (frames[-1], ancestry)
1092
1093
1094    def _add_step_init(self, local_vars, current_function):
1095        """If the function returned a dictionary that includes a
1096        function named 'step_init', prepend it to our list of steps.
1097        This will only get run the first time a function with a nested
1098        use of the step engine is run."""
1099
1100        if (isinstance(local_vars, dict) and
1101            'step_init' in local_vars and
1102            callable(local_vars['step_init'])):
1103            # The init step is a child of the function
1104            # we were just running.
1105            self.current_step_ancestry.append(current_function)
1106            self.next_step_prepend('step_init')
1107
1108
1109    def step_engine(self):
1110        """The multi-run engine used when the control file defines step_init.
1111
1112        Does the next step.
1113        """
1114
1115        # Set up the environment and then interpret the control file.
1116        # Some control files will have code outside of functions,
1117        # which means we need to have our state engine initialized
1118        # before reading in the file.
1119        global_control_vars = {'job': self}
1120        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1121        try:
1122            execfile(self.control, global_control_vars, global_control_vars)
1123        except error.TestNAError, detail:
1124            self.record(detail.exit_status, None, self.control, str(detail))
1125        except SystemExit:
1126            raise  # Send error.JobContinue and JobComplete on up to runjob.
1127        except Exception, detail:
1128            # Syntax errors or other general Python exceptions coming out of
1129            # the top level of the control file itself go through here.
1130            raise error.UnhandledJobError(detail)
1131
1132        # If we loaded in a mid-job state file, then we presumably
1133        # know what steps we have yet to run.
1134        if not self._is_continuation:
1135            if 'step_init' in global_control_vars:
1136                self.next_step(global_control_vars['step_init'])
1137
1138        # Iterate through the steps.  If we reboot, we'll simply
1139        # continue iterating on the next step.
1140        while len(self.get_state('__steps')) > 0:
1141            steps = self.get_state('__steps')
1142            (ancestry, fn_name, args, dargs) = steps.pop(0)
1143            self.set_state('__steps', steps)
1144
1145            self.next_step_index = 0
1146            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1147            local_vars, self.current_step_ancestry = ret
1148            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1149            self._add_step_init(local_vars, fn_name)
1150
1151
1152    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1153        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1154                                   on_every_test)
1155
1156
1157    def add_sysinfo_logfile(self, file, on_every_test=False):
1158        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1159
1160
1161    def _add_sysinfo_loggable(self, loggable, on_every_test):
1162        if on_every_test:
1163            self.sysinfo.test_loggables.add(loggable)
1164        else:
1165            self.sysinfo.boot_loggables.add(loggable)
1166        self._save_sysinfo_state()
1167
1168
1169    def _load_sysinfo_state(self):
1170        state = self.get_state("__sysinfo", None)
1171        if state:
1172            self.sysinfo.deserialize(state)
1173
1174
1175    def _save_sysinfo_state(self):
1176        state = self.sysinfo.serialize()
1177        self.set_state("__sysinfo", state)
1178
1179
1180    def _init_group_level(self):
1181        self.group_level = self.get_state("__group_level", default=0)
1182
1183
1184    def _increment_group_level(self):
1185        self.group_level += 1
1186        self.set_state("__group_level", self.group_level)
1187
1188
1189    def _decrement_group_level(self):
1190        self.group_level -= 1
1191        self.set_state("__group_level", self.group_level)
1192
1193
1194    def record(self, status_code, subdir, operation, status = '',
1195               optional_fields=None):
1196        """
1197        Record job-level status
1198
1199        The intent is to make this file both machine parseable and
1200        human readable. That involves a little more complexity, but
1201        really isn't all that bad ;-)
1202
1203        Format is <status code>\t<subdir>\t<operation>\t<status>
1204
1205        status code: (GOOD|WARN|FAIL|ABORT)
1206                or   START
1207                or   END (GOOD|WARN|FAIL|ABORT)
1208
1209        subdir: MUST be a relevant subdirectory in the results,
1210        or None, which will be represented as '----'
1211
1212        operation: description of what you ran (e.g. "dbench", or
1213                                        "mkfs -t foobar /dev/sda9")
1214
1215        status: error message or "completed sucessfully"
1216
1217        ------------------------------------------------------------
1218
1219        Initial tabs indicate indent levels for grouping, and is
1220        governed by self.group_level
1221
1222        multiline messages have secondary lines prefaced by a double
1223        space ('  ')
1224        """
1225
1226        if subdir:
1227            if re.match(r'[\n\t]', subdir):
1228                raise ValueError("Invalid character in subdir string")
1229            substr = subdir
1230        else:
1231            substr = '----'
1232
1233        if not log.is_valid_status(status_code):
1234            raise ValueError("Invalid status code supplied: %s" % status_code)
1235        if not operation:
1236            operation = '----'
1237
1238        if re.match(r'[\n\t]', operation):
1239            raise ValueError("Invalid character in operation string")
1240        operation = operation.rstrip()
1241
1242        if not optional_fields:
1243            optional_fields = {}
1244
1245        status = status.rstrip()
1246        status = re.sub(r"\t", "  ", status)
1247        # Ensure any continuation lines are marked so we can
1248        # detect them in the status file to ensure it is parsable.
1249        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
1250
1251        # Generate timestamps for inclusion in the logs
1252        epoch_time = int(time.time())  # seconds since epoch, in UTC
1253        local_time = time.localtime(epoch_time)
1254        optional_fields["timestamp"] = str(epoch_time)
1255        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1256                                                     local_time)
1257
1258        fields = [status_code, substr, operation]
1259        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1260        fields.append(status)
1261
1262        msg = '\t'.join(str(x) for x in fields)
1263        msg = '\t' * self.group_level + msg
1264
1265        msg_tag = ""
1266        if "." in self.log_filename:
1267            msg_tag = self.log_filename.split(".", 1)[1]
1268
1269        self.harness.test_status_detail(status_code, substr, operation, status,
1270                                        msg_tag)
1271        self.harness.test_status(msg, msg_tag)
1272
1273        # log to stdout (if enabled)
1274        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
1275        logging.info(msg)
1276
1277        # log to the "root" status log
1278        status_file = os.path.join(self.resultdir, self.log_filename)
1279        open(status_file, "a").write(msg + "\n")
1280
1281        # log to the subdir status log (if subdir is set)
1282        if subdir:
1283            dir = os.path.join(self.resultdir, subdir)
1284            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1285            open(status_file, "a").write(msg + "\n")
1286
1287
1288class disk_usage_monitor:
1289    def __init__(self, logging_func, device, max_mb_per_hour):
1290        self.func = logging_func
1291        self.device = device
1292        self.max_mb_per_hour = max_mb_per_hour
1293
1294
1295    def start(self):
1296        self.initial_space = utils.freespace(self.device)
1297        self.start_time = time.time()
1298
1299
1300    def stop(self):
1301        # if no maximum usage rate was set, we don't need to
1302        # generate any warnings
1303        if not self.max_mb_per_hour:
1304            return
1305
1306        final_space = utils.freespace(self.device)
1307        used_space = self.initial_space - final_space
1308        stop_time = time.time()
1309        total_time = stop_time - self.start_time
1310        # round up the time to one minute, to keep extremely short
1311        # tests from generating false positives due to short, badly
1312        # timed bursts of activity
1313        total_time = max(total_time, 60.0)
1314
1315        # determine the usage rate
1316        bytes_per_sec = used_space / total_time
1317        mb_per_sec = bytes_per_sec / 1024**2
1318        mb_per_hour = mb_per_sec * 60 * 60
1319
1320        if mb_per_hour > self.max_mb_per_hour:
1321            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1322            msg %= (self.device, mb_per_hour)
1323            self.func(msg)
1324
1325
1326    @classmethod
1327    def watch(cls, *monitor_args, **monitor_dargs):
1328        """ Generic decorator to wrap a function call with the
1329        standard create-monitor -> start -> call -> stop idiom."""
1330        def decorator(func):
1331            def watched_func(*args, **dargs):
1332                monitor = cls(*monitor_args, **monitor_dargs)
1333                monitor.start()
1334                try:
1335                    func(*args, **dargs)
1336                finally:
1337                    monitor.stop()
1338            return watched_func
1339        return decorator
1340
1341
1342def runjob(control, options):
1343    """
1344    Run a job using the given control file.
1345
1346    This is the main interface to this module.
1347
1348    @see base_job.__init__ for parameter info.
1349    """
1350    control = os.path.abspath(control)
1351    state = control + '.state'
1352    # Ensure state file is cleaned up before the job starts to run if autotest
1353    # is not running with the --continue flag
1354    if not options.cont and os.path.isfile(state):
1355        logging.debug('Cleaning up previously found state file')
1356        os.remove(state)
1357
1358    # instantiate the job object ready for the control file.
1359    myjob = None
1360    try:
1361        # Check that the control file is valid
1362        if not os.path.exists(control):
1363            raise error.JobError(control + ": control file not found")
1364
1365        # When continuing, the job is complete when there is no
1366        # state file, ensure we don't try and continue.
1367        if options.cont and not os.path.exists(state):
1368            raise error.JobComplete("all done")
1369
1370        myjob = job(control, options)
1371
1372        # Load in the users control file, may do any one of:
1373        #  1) execute in toto
1374        #  2) define steps, and select the first via next_step()
1375        myjob.step_engine()
1376
1377    except error.JobContinue:
1378        sys.exit(5)
1379
1380    except error.JobComplete:
1381        sys.exit(1)
1382
1383    except error.JobError, instance:
1384        logging.error("JOB ERROR: " + instance.args[0])
1385        if myjob:
1386            command = None
1387            if len(instance.args) > 1:
1388                command = instance.args[1]
1389                myjob.record('ABORT', None, command, instance.args[0])
1390            myjob._decrement_group_level()
1391            myjob.record('END ABORT', None, None, instance.args[0])
1392            assert (myjob.group_level == 0), ('myjob.group_level must be 0,'
1393                                              ' not %d' % myjob.group_level)
1394            myjob.complete(1)
1395        else:
1396            sys.exit(1)
1397
1398    except Exception, e:
1399        # NOTE: job._run_step_fn and job.step_engine will turn things into
1400        # a JobError for us.  If we get here, its likely an autotest bug.
1401        msg = str(e) + '\n' + traceback.format_exc()
1402        logging.critical("JOB ERROR (autotest bug?): " + msg)
1403        if myjob:
1404            myjob._decrement_group_level()
1405            myjob.record('END ABORT', None, None, msg)
1406            assert(myjob.group_level == 0)
1407            myjob.complete(1)
1408        else:
1409            sys.exit(1)
1410
1411    # If we get here, then we assume the job is complete and good.
1412    myjob._decrement_group_level()
1413    myjob.record('END GOOD', None, None)
1414    assert(myjob.group_level == 0)
1415
1416    myjob.complete(0)
1417
1418
1419site_job = utils.import_site_class(
1420    __file__, "autotest_lib.client.bin.site_job", "site_job", base_job)
1421
1422class job(site_job):
1423    pass
1424