job.py revision 861b2d54aec24228cdb3895dbc40062cb40cb2ad
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9import logging, getpass, errno, weakref
10import cPickle as pickle
11from autotest_lib.client.bin import client_logging_config
12from autotest_lib.client.bin import utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, test, local_host
15from autotest_lib.client.bin import partition as partition_lib
16from autotest_lib.client.common_lib import base_job
17from autotest_lib.client.common_lib import error, barrier, log, logging_manager
18from autotest_lib.client.common_lib import base_packages, packages
19from autotest_lib.client.common_lib import global_config
20
21
22LAST_BOOT_TAG = object()
23JOB_PREAMBLE = """
24from autotest_lib.client.common_lib.error import *
25from autotest_lib.client.bin.utils import *
26"""
27
28
29class StepError(error.AutotestError):
30    pass
31
32class NotAvailableError(error.AutotestError):
33    pass
34
35
36
37def _run_test_complete_on_exit(f):
38    """Decorator for job methods that automatically calls
39    self.harness.run_test_complete when the method exits, if appropriate."""
40    def wrapped(self, *args, **dargs):
41        try:
42            return f(self, *args, **dargs)
43        finally:
44            if self._logger.global_filename == 'status':
45                self.harness.run_test_complete()
46                if self.drop_caches:
47                    logging.debug("Dropping caches")
48                    utils.drop_caches()
49    wrapped.__name__ = f.__name__
50    wrapped.__doc__ = f.__doc__
51    wrapped.__dict__.update(f.__dict__)
52    return wrapped
53
54
55class status_indenter(base_job.status_indenter):
56    """Provide a status indenter that is backed by job._record_prefix."""
57    def __init__(self, job):
58        self.job = weakref.proxy(job)  # avoid a circular reference
59
60
61    @property
62    def indent(self):
63        return self.job._record_indent
64
65
66    def increment(self):
67        self.job._record_indent += 1
68
69
70    def decrement(self):
71        self.job._record_indent -= 1
72
73
74class base_client_job(base_job.base_job):
75    """The client-side concrete implementation of base_job.
76
77    Optional properties provided by this implementation:
78        control
79        bootloader
80        harness
81    """
82
83    _WARNING_DISABLE_DELAY = 5
84
85    # _record_indent is a persistent property, but only on the client
86    _job_state = base_job.base_job._job_state
87    _record_indent = _job_state.property_factory(
88        '_state', '_record_indent', 0, namespace='client')
89    _max_disk_usage_rate = _job_state.property_factory(
90        '_state', '_max_disk_usage_rate', 0.0, namespace='client')
91
92
93    def __init__(self, control, options, drop_caches=True,
94                 extra_copy_cmdline=None):
95        """
96        Prepare a client side job object.
97
98        @param control: The control file (pathname of).
99        @param options: an object which includes:
100                jobtag: The job tag string (eg "default").
101                cont: If this is the continuation of this job.
102                harness_type: An alternative server harness.  [None]
103                use_external_logging: If true, the enable_external_logging
104                          method will be called during construction.  [False]
105        @param drop_caches: If true, utils.drop_caches() is called before and
106                between all tests.  [True]
107        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
108                copy from the running kernel to all the installed kernels with
109                this job
110        """
111        super(base_client_job, self).__init__(options=options)
112        self._pre_record_init(control, options)
113        try:
114            self._post_record_init(control, options, drop_caches,
115                                   extra_copy_cmdline)
116        except Exception, err:
117            self.record(
118                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
119                    str(err))
120            raise
121
122
123    @classmethod
124    def _get_environ_autodir(cls):
125        return os.environ['AUTODIR']
126
127
128    @classmethod
129    def _find_base_directories(cls):
130        """
131        Determine locations of autodir and clientdir (which are the same)
132        using os.environ. Serverdir does not exist in this context.
133        """
134        autodir = clientdir = cls._get_environ_autodir()
135        return autodir, clientdir, None
136
137
138    @classmethod
139    def _parse_args(cls, args):
140        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)
141
142
143    def _find_resultdir(self, options):
144        """
145        Determine the directory for storing results. On a client this is
146        always <autodir>/results/<tag>, where tag is passed in on the command
147        line as an option.
148        """
149        return os.path.join(self.autodir, 'results', options.tag)
150
151
152    def _get_status_logger(self):
153        """Return a reference to the status logger."""
154        return self._logger
155
156
157    def _pre_record_init(self, control, options):
158        """
159        Initialization function that should peform ONLY the required
160        setup so that the self.record() method works.
161
162        As of now self.record() needs self.resultdir, self._group_level,
163        self.harness and of course self._logger.
164        """
165        if not options.cont:
166            self._cleanup_debugdir_files()
167            self._cleanup_results_dir()
168
169        logging_manager.configure_logging(
170            client_logging_config.ClientLoggingConfig(),
171            results_dir=self.resultdir,
172            verbose=options.verbose)
173        logging.info('Writing results to %s', self.resultdir)
174
175        # init_group_level needs the state
176        self.control = os.path.realpath(control)
177        self._is_continuation = options.cont
178        self._current_step_ancestry = []
179        self._next_step_index = 0
180        self._load_state()
181
182        # harness is chosen by following rules:
183        # 1. explicitly specified via command line
184        # 2. harness stored in state file (if continuing job '-c')
185        # 3. default harness
186        selected_harness = None
187        if options.harness:
188            selected_harness = options.harness
189            self._state.set('client', 'harness', selected_harness)
190        else:
191            stored_harness = self._state.get('client', 'harness', None)
192            if stored_harness:
193                selected_harness = stored_harness
194
195        self.harness = harness.select(selected_harness, self)
196
197        # set up the status logger
198        def client_job_record_hook(entry):
199            msg_tag = ''
200            if '.' in self._logger.global_filename:
201                msg_tag = self._logger.global_filename.split('.', 1)[1]
202            # send the entry to the job harness
203            message = '\n'.join([entry.message] + entry.extra_message_lines)
204            rendered_entry = self._logger.render_entry(entry)
205            self.harness.test_status_detail(entry.status_code, entry.subdir,
206                                            entry.operation, message, msg_tag)
207            self.harness.test_status(rendered_entry, msg_tag)
208            # send the entry to stdout, if it's enabled
209            logging.info(rendered_entry)
210        self._logger = base_job.status_logger(
211            self, status_indenter(self), record_hook=client_job_record_hook,
212            tap_writer=self._tap)
213
214    def _post_record_init(self, control, options, drop_caches,
215                          extra_copy_cmdline):
216        """
217        Perform job initialization not required by self.record().
218        """
219        self._init_drop_caches(drop_caches)
220
221        self._init_packages()
222
223        self.sysinfo = sysinfo.sysinfo(self.resultdir)
224        self._load_sysinfo_state()
225
226        if not options.cont:
227            download = os.path.join(self.testdir, 'download')
228            if not os.path.exists(download):
229                os.mkdir(download)
230
231            shutil.copyfile(self.control,
232                            os.path.join(self.resultdir, 'control'))
233
234        self.control = control
235
236        self.logging = logging_manager.get_logging_manager(
237                manage_stdout_and_stderr=True, redirect_fds=True)
238        self.logging.start_logging()
239
240        self._config = config.config(self)
241        self.profilers = profilers.profilers(self)
242
243        self._init_bootloader()
244
245        self.machines = [options.hostname]
246        self.hosts = set([local_host.LocalHost(hostname=options.hostname,
247                                               bootloader=self.bootloader)])
248
249        self.args = []
250        if options.args:
251            self.args = self._parse_args(options.args)
252
253        if options.user:
254            self.user = options.user
255        else:
256            self.user = getpass.getuser()
257
258        self.sysinfo.log_per_reboot_data()
259
260        if not options.cont:
261            self.record('START', None, None)
262
263        self.harness.run_start()
264
265        if options.log:
266            self.enable_external_logging()
267
268        self._init_cmdline(extra_copy_cmdline)
269
270        self.num_tests_run = None
271        self.num_tests_failed = None
272
273        self.warning_loggers = None
274        self.warning_manager = None
275
276
277    def _init_drop_caches(self, drop_caches):
278        """
279        Perform the drop caches initialization.
280        """
281        self.drop_caches_between_iterations = (
282                       global_config.global_config.get_config_value('CLIENT',
283                                            'drop_caches_between_iterations',
284                                            type=bool, default=True))
285        self.drop_caches = drop_caches
286        if self.drop_caches:
287            logging.debug("Dropping caches")
288            utils.drop_caches()
289
290
291    def _init_bootloader(self):
292        """
293        Perform boottool initialization.
294        """
295        tool = self.config_get('boottool.executable')
296        self.bootloader = boottool.boottool(tool)
297
298
299    def _init_packages(self):
300        """
301        Perform the packages support initialization.
302        """
303        self.pkgmgr = packages.PackageManager(
304            self.autodir, run_function_dargs={'timeout':3600})
305
306
307    def _init_cmdline(self, extra_copy_cmdline):
308        """
309        Initialize default cmdline for booted kernels in this job.
310        """
311        copy_cmdline = set(['console'])
312        if extra_copy_cmdline is not None:
313            copy_cmdline.update(extra_copy_cmdline)
314
315        # extract console= and other args from cmdline and add them into the
316        # base args that we use for all kernels we install
317        cmdline = utils.read_one_line('/proc/cmdline')
318        kernel_args = []
319        for karg in cmdline.split():
320            for param in copy_cmdline:
321                if karg.startswith(param) and \
322                    (len(param) == len(karg) or karg[len(param)] == '='):
323                    kernel_args.append(karg)
324        self.config_set('boot.default_args', ' '.join(kernel_args))
325
326
327    def _cleanup_results_dir(self):
328        """Delete everything in resultsdir"""
329        assert os.path.exists(self.resultdir)
330        list_files = glob.glob('%s/*' % self.resultdir)
331        for f in list_files:
332            if os.path.isdir(f):
333                shutil.rmtree(f)
334            elif os.path.isfile(f):
335                os.remove(f)
336
337
338    def _cleanup_debugdir_files(self):
339        """
340        Delete any leftover debugdir files
341        """
342        list_files = glob.glob("/tmp/autotest_results_dir.*")
343        for f in list_files:
344            os.remove(f)
345
346
347    def disable_warnings(self, warning_type):
348        self.record("INFO", None, None,
349                    "disabling %s warnings" % warning_type,
350                    {"warnings.disable": warning_type})
351        time.sleep(self._WARNING_DISABLE_DELAY)
352
353
354    def enable_warnings(self, warning_type):
355        time.sleep(self._WARNING_DISABLE_DELAY)
356        self.record("INFO", None, None,
357                    "enabling %s warnings" % warning_type,
358                    {"warnings.enable": warning_type})
359
360
361    def monitor_disk_usage(self, max_rate):
362        """\
363        Signal that the job should monitor disk space usage on /
364        and generate a warning if a test uses up disk space at a
365        rate exceeding 'max_rate'.
366
367        Parameters:
368             max_rate - the maximium allowed rate of disk consumption
369                        during a test, in MB/hour, or 0 to indicate
370                        no limit.
371        """
372        self._max_disk_usage_rate = max_rate
373
374
375    def relative_path(self, path):
376        """\
377        Return a patch relative to the job results directory
378        """
379        head = len(self.resultdir) + 1     # remove the / inbetween
380        return path[head:]
381
382
383    def control_get(self):
384        return self.control
385
386
387    def control_set(self, control):
388        self.control = os.path.abspath(control)
389
390
391    def harness_select(self, which):
392        self.harness = harness.select(which, self)
393
394
395    def config_set(self, name, value):
396        self._config.set(name, value)
397
398
399    def config_get(self, name):
400        return self._config.get(name)
401
402
403    def setup_dirs(self, results_dir, tmp_dir):
404        if not tmp_dir:
405            tmp_dir = os.path.join(self.tmpdir, 'build')
406        if not os.path.exists(tmp_dir):
407            os.mkdir(tmp_dir)
408        if not os.path.isdir(tmp_dir):
409            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
410            raise ValueError(e_msg)
411
412        # We label the first build "build" and then subsequent ones
413        # as "build.2", "build.3", etc. Whilst this is a little bit
414        # inconsistent, 99.9% of jobs will only have one build
415        # (that's not done as kernbench, sparse, or buildtest),
416        # so it works out much cleaner. One of life's comprimises.
417        if not results_dir:
418            results_dir = os.path.join(self.resultdir, 'build')
419            i = 2
420            while os.path.exists(results_dir):
421                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
422                i += 1
423        if not os.path.exists(results_dir):
424            os.mkdir(results_dir)
425
426        return (results_dir, tmp_dir)
427
428
429    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
430                            kjob = None ):
431        """Summon a xen object"""
432        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
433        build_dir = 'xen'
434        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
435                       leave, kjob)
436
437
438    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
439        """Summon a kernel object"""
440        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
441        build_dir = 'linux'
442        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
443                                  build_dir, leave)
444
445
446    def barrier(self, *args, **kwds):
447        """Create a barrier object"""
448        return barrier.barrier(*args, **kwds)
449
450
451    def install_pkg(self, name, pkg_type, install_dir):
452        '''
453        This method is a simple wrapper around the actual package
454        installation method in the Packager class. This is used
455        internally by the profilers, deps and tests code.
456        name : name of the package (ex: sleeptest, dbench etc.)
457        pkg_type : Type of the package (ex: test, dep etc.)
458        install_dir : The directory in which the source is actually
459                      untarred into. (ex: client/profilers/<name> for profilers)
460        '''
461        if self.pkgmgr.repositories:
462            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
463
464
465    def add_repository(self, repo_urls):
466        '''
467        Adds the repository locations to the job so that packages
468        can be fetched from them when needed. The repository list
469        needs to be a string list
470        Ex: job.add_repository(['http://blah1','http://blah2'])
471        '''
472        for repo_url in repo_urls:
473            self.pkgmgr.add_repository(repo_url)
474
475        # Fetch the packages' checksum file that contains the checksums
476        # of all the packages if it is not already fetched. The checksum
477        # is always fetched whenever a job is first started. This
478        # is not done in the job's constructor as we don't have the list of
479        # the repositories there (and obviously don't care about this file
480        # if we are not using the repos)
481        try:
482            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
483                                              base_packages.CHECKSUM_FILE)
484            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
485                                  checksum_file_path, use_checksum=False)
486        except error.PackageFetchError:
487            # packaging system might not be working in this case
488            # Silently fall back to the normal case
489            pass
490
491
492    def require_gcc(self):
493        """
494        Test whether gcc is installed on the machine.
495        """
496        # check if gcc is installed on the system.
497        try:
498            utils.system('which gcc')
499        except error.CmdError, e:
500            raise NotAvailableError('gcc is required by this job and is '
501                                    'not available on the system')
502
503
504    def setup_dep(self, deps):
505        """Set up the dependencies for this test.
506        deps is a list of libraries required for this test.
507        """
508        # Fetch the deps from the repositories and set them up.
509        for dep in deps:
510            dep_dir = os.path.join(self.autodir, 'deps', dep)
511            # Search for the dependency in the repositories if specified,
512            # else check locally.
513            try:
514                self.install_pkg(dep, 'dep', dep_dir)
515            except error.PackageInstallError:
516                # see if the dep is there locally
517                pass
518
519            # dep_dir might not exist if it is not fetched from the repos
520            if not os.path.exists(dep_dir):
521                raise error.TestError("Dependency %s does not exist" % dep)
522
523            os.chdir(dep_dir)
524            if execfile('%s.py' % dep, {}) is None:
525                logging.info('Dependency %s successfuly built', dep)
526
527
528    def _runtest(self, url, tag, args, dargs):
529        try:
530            l = lambda : test.runtest(self, url, tag, args, dargs)
531            pid = parallel.fork_start(self.resultdir, l)
532            parallel.fork_waitfor(self.resultdir, pid)
533        except error.TestBaseException:
534            # These are already classified with an error type (exit_status)
535            raise
536        except error.JobError:
537            raise  # Caught further up and turned into an ABORT.
538        except Exception, e:
539            # Converts all other exceptions thrown by the test regardless
540            # of phase into a TestError(TestBaseException) subclass that
541            # reports them with their full stack trace.
542            raise error.UnhandledTestError(e)
543
544
545    @_run_test_complete_on_exit
546    def run_test(self, url, *args, **dargs):
547        """
548        Summon a test object and run it.
549
550        @param url A url that identifies the test to run.
551        @param tag An optional keyword argument that will be added to the
552            test and subdir name.
553        @param subdir_tag An optional keyword argument that will be added
554            to the subdir name.
555
556        @returns True if the test passes, False otherwise.
557        """
558        group, testname = self.pkgmgr.get_package_name(url, 'test')
559        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
560        outputdir = self._make_test_outputdir(subdir)
561
562        def log_warning(reason):
563            self.record("WARN", subdir, testname, reason)
564        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
565        def group_func():
566            try:
567                self._runtest(url, tag, args, dargs)
568            except error.TestBaseException, detail:
569                # The error is already classified, record it properly.
570                self.record(detail.exit_status, subdir, testname, str(detail))
571                raise
572            else:
573                self.record('GOOD', subdir, testname, 'completed successfully')
574
575        try:
576            self._rungroup(subdir, testname, group_func)
577            return True
578        except error.TestBaseException:
579            return False
580        # Any other exception here will be given to the caller
581        #
582        # NOTE: The only exception possible from the control file here
583        # is error.JobError as _runtest() turns all others into an
584        # UnhandledTestError that is caught above.
585
586
587    def _rungroup(self, subdir, testname, function, *args, **dargs):
588        """\
589        subdir:
590                name of the group
591        testname:
592                name of the test to run, or support step
593        function:
594                subroutine to run
595        *args:
596                arguments for the function
597
598        Returns the result of the passed in function
599        """
600
601        try:
602            self.record('START', subdir, testname)
603            self._state.set('client', 'unexpected_reboot', (subdir, testname))
604            result = function(*args, **dargs)
605            self.record('END GOOD', subdir, testname)
606            return result
607        except error.TestBaseException, e:
608            self.record('END %s' % e.exit_status, subdir, testname)
609            raise
610        except error.JobError, e:
611            self.record('END ABORT', subdir, testname)
612            raise
613        except Exception, e:
614            # This should only ever happen due to a bug in the given
615            # function's code.  The common case of being called by
616            # run_test() will never reach this.  If a control file called
617            # run_group() itself, bugs in its function will be caught
618            # here.
619            err_msg = str(e) + '\n' + traceback.format_exc()
620            self.record('END ERROR', subdir, testname, err_msg)
621            raise
622        finally:
623            self._state.discard('client', 'unexpected_reboot')
624
625
626    def run_group(self, function, tag=None, **dargs):
627        """
628        Run a function nested within a group level.
629
630        function:
631                Callable to run.
632        tag:
633                An optional tag name for the group.  If None (default)
634                function.__name__ will be used.
635        **dargs:
636                Named arguments for the function.
637        """
638        if tag:
639            name = tag
640        else:
641            name = function.__name__
642
643        try:
644            return self._rungroup(subdir=None, testname=name,
645                                  function=function, **dargs)
646        except (SystemExit, error.TestBaseException):
647            raise
648        # If there was a different exception, turn it into a TestError.
649        # It will be caught by step_engine or _run_step_fn.
650        except Exception, e:
651            raise error.UnhandledTestError(e)
652
653
654    def cpu_count(self):
655        return utils.count_cpus()  # use total system count
656
657
658    def start_reboot(self):
659        self.record('START', None, 'reboot')
660        self.record('GOOD', None, 'reboot.start')
661
662
663    def _record_reboot_failure(self, subdir, operation, status,
664                               running_id=None):
665        self.record("ABORT", subdir, operation, status)
666        if not running_id:
667            running_id = utils.running_os_ident()
668        kernel = {"kernel": running_id.split("::")[0]}
669        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
670
671
672    def _check_post_reboot(self, subdir, running_id=None):
673        """
674        Function to perform post boot checks such as if the system configuration
675        has changed across reboots (specifically, CPUs and partitions).
676
677        @param subdir: The subdir to use in the job.record call.
678        @param running_id: An optional running_id to include in the reboot
679            failure log message
680
681        @raise JobError: Raised if the current configuration does not match the
682            pre-reboot configuration.
683        """
684        # check to see if any partitions have changed
685        partition_list = partition_lib.get_partition_list(self,
686                                                          exclude_swap=False)
687        mount_info = partition_lib.get_mount_info(partition_list)
688        old_mount_info = self._state.get('client', 'mount_info')
689        if mount_info != old_mount_info:
690            new_entries = mount_info - old_mount_info
691            old_entries = old_mount_info - mount_info
692            description = ("mounted partitions are different after reboot "
693                           "(old entries: %s, new entries: %s)" %
694                           (old_entries, new_entries))
695            self._record_reboot_failure(subdir, "reboot.verify_config",
696                                        description, running_id=running_id)
697            raise error.JobError("Reboot failed: %s" % description)
698
699        # check to see if any CPUs have changed
700        cpu_count = utils.count_cpus()
701        old_count = self._state.get('client', 'cpu_count')
702        if cpu_count != old_count:
703            description = ('Number of CPUs changed after reboot '
704                           '(old count: %d, new count: %d)' %
705                           (old_count, cpu_count))
706            self._record_reboot_failure(subdir, 'reboot.verify_config',
707                                        description, running_id=running_id)
708            raise error.JobError('Reboot failed: %s' % description)
709
710
711    def end_reboot(self, subdir, kernel, patches, running_id=None):
712        self._check_post_reboot(subdir, running_id=running_id)
713
714        # strip ::<timestamp> from the kernel version if present
715        kernel = kernel.split("::")[0]
716        kernel_info = {"kernel": kernel}
717        for i, patch in enumerate(patches):
718            kernel_info["patch%d" % i] = patch
719        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
720
721
722    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
723                              type='src', patches=[]):
724        """ Check the passed kernel identifier against the command line
725            and the running kernel, abort the job on missmatch. """
726
727        logging.info("POST BOOT: checking booted kernel "
728                     "mark=%d identity='%s' type='%s'",
729                     expected_when, expected_id, type)
730
731        running_id = utils.running_os_ident()
732
733        cmdline = utils.read_one_line("/proc/cmdline")
734
735        find_sum = re.compile(r'.*IDENT=(\d+)')
736        m = find_sum.match(cmdline)
737        cmdline_when = -1
738        if m:
739            cmdline_when = int(m.groups()[0])
740
741        # We have all the facts, see if they indicate we
742        # booted the requested kernel or not.
743        bad = False
744        if (type == 'src' and expected_id != running_id or
745            type == 'rpm' and
746            not running_id.startswith(expected_id + '::')):
747            logging.error("Kernel identifier mismatch")
748            bad = True
749        if expected_when != cmdline_when:
750            logging.error("Kernel command line mismatch")
751            bad = True
752
753        if bad:
754            logging.error("   Expected Ident: " + expected_id)
755            logging.error("    Running Ident: " + running_id)
756            logging.error("    Expected Mark: %d", expected_when)
757            logging.error("Command Line Mark: %d", cmdline_when)
758            logging.error("     Command Line: " + cmdline)
759
760            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
761                                        running_id=running_id)
762            raise error.JobError("Reboot returned with the wrong kernel")
763
764        self.record('GOOD', subdir, 'reboot.verify',
765                    utils.running_os_full_version())
766        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
767
768
769    def partition(self, device, loop_size=0, mountpoint=None):
770        """
771        Work with a machine partition
772
773            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
774            @param mountpoint: Specify a directory to mount to. If not specified
775                               autotest tmp directory will be used.
776            @param loop_size: Size of loopback device (in MB). Defaults to 0.
777
778            @return: A L{client.bin.partition.partition} object
779        """
780
781        if not mountpoint:
782            mountpoint = self.tmpdir
783        return partition_lib.partition(self, device, loop_size, mountpoint)
784
785    @utils.deprecated
786    def filesystem(self, device, mountpoint=None, loop_size=0):
787        """ Same as partition
788
789        @deprecated: Use partition method instead
790        """
791        return self.partition(device, loop_size, mountpoint)
792
793
794    def enable_external_logging(self):
795        pass
796
797
798    def disable_external_logging(self):
799        pass
800
801
802    def reboot_setup(self):
803        # save the partition list and mount points, as well as the cpu count
804        partition_list = partition_lib.get_partition_list(self,
805                                                          exclude_swap=False)
806        mount_info = partition_lib.get_mount_info(partition_list)
807        self._state.set('client', 'mount_info', mount_info)
808        self._state.set('client', 'cpu_count', utils.count_cpus())
809
810
811    def reboot(self, tag=LAST_BOOT_TAG):
812        if tag == LAST_BOOT_TAG:
813            tag = self.last_boot_tag
814        else:
815            self.last_boot_tag = tag
816
817        self.reboot_setup()
818        self.harness.run_reboot()
819        default = self.config_get('boot.set_default')
820        if default:
821            self.bootloader.set_default(tag)
822        else:
823            self.bootloader.boot_once(tag)
824
825        # HACK: using this as a module sometimes hangs shutdown, so if it's
826        # installed unload it first
827        utils.system("modprobe -r netconsole", ignore_status=True)
828
829        # sync first, so that a sync during shutdown doesn't time out
830        utils.system("sync; sync", ignore_status=True)
831
832        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
833        self.quit()
834
835
836    def noop(self, text):
837        logging.info("job: noop: " + text)
838
839
840    @_run_test_complete_on_exit
841    def parallel(self, *tasklist):
842        """Run tasks in parallel"""
843
844        pids = []
845        old_log_filename = self._logger.global_filename
846        for i, task in enumerate(tasklist):
847            assert isinstance(task, (tuple, list))
848            self._logger.global_filename = old_log_filename + (".%d" % i)
849            def task_func():
850                # stub out _record_indent with a process-local one
851                base_record_indent = self._record_indent
852                proc_local = self._job_state.property_factory(
853                    '_state', '_record_indent.%d' % os.getpid(),
854                    base_record_indent, namespace='client')
855                self.__class__._record_indent = proc_local
856                task[0](*task[1:])
857            pids.append(parallel.fork_start(self.resultdir, task_func))
858
859        old_log_path = os.path.join(self.resultdir, old_log_filename)
860        old_log = open(old_log_path, "a")
861        exceptions = []
862        for i, pid in enumerate(pids):
863            # wait for the task to finish
864            try:
865                parallel.fork_waitfor(self.resultdir, pid)
866            except Exception, e:
867                exceptions.append(e)
868            # copy the logs from the subtask into the main log
869            new_log_path = old_log_path + (".%d" % i)
870            if os.path.exists(new_log_path):
871                new_log = open(new_log_path)
872                old_log.write(new_log.read())
873                new_log.close()
874                old_log.flush()
875                os.remove(new_log_path)
876        old_log.close()
877
878        self._logger.global_filename = old_log_filename
879
880        # handle any exceptions raised by the parallel tasks
881        if exceptions:
882            msg = "%d task(s) failed in job.parallel" % len(exceptions)
883            raise error.JobError(msg)
884
885
886    def quit(self):
887        # XXX: should have a better name.
888        self.harness.run_pause()
889        raise error.JobContinue("more to come")
890
891
892    def complete(self, status):
893        """Write pending TAP reports, clean up, and exit"""
894        # write out TAP reports
895        if self._tap.do_tap_report:
896            self._tap.write()
897            self._tap._write_tap_archive()
898
899        # We are about to exit 'complete' so clean up the control file.
900        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
901        shutil.move(self._state_file, dest)
902
903        self.harness.run_complete()
904        self.disable_external_logging()
905        sys.exit(status)
906
907
908    def _load_state(self):
909        # grab any initial state and set up $CONTROL.state as the backing file
910        init_state_file = self.control + '.init.state'
911        self._state_file = self.control + '.state'
912        if os.path.exists(init_state_file):
913            shutil.move(init_state_file, self._state_file)
914        self._state.set_backing_file(self._state_file)
915
916        # initialize the state engine, if necessary
917        has_steps = self._state.has('client', 'steps')
918        if not self._is_continuation and has_steps:
919            raise RuntimeError('Loaded state can only contain client.steps if '
920                               'this is a continuation')
921
922        if not has_steps:
923            logging.info('Initializing the state engine')
924            self._state.set('client', 'steps', [])
925
926
927    def __create_step_tuple(self, fn, args, dargs):
928        # Legacy code passes in an array where the first arg is
929        # the function or its name.
930        if isinstance(fn, list):
931            assert(len(args) == 0)
932            assert(len(dargs) == 0)
933            args = fn[1:]
934            fn = fn[0]
935        # Pickling actual functions is hairy, thus we have to call
936        # them by name.  Unfortunately, this means only functions
937        # defined globally can be used as a next step.
938        if callable(fn):
939            fn = fn.__name__
940        if not isinstance(fn, types.StringTypes):
941            raise StepError("Next steps must be functions or "
942                            "strings containing the function name")
943        ancestry = copy.copy(self._current_step_ancestry)
944        return (ancestry, fn, args, dargs)
945
946
947    def next_step_append(self, fn, *args, **dargs):
948        """Define the next step and place it at the end"""
949        steps = self._state.get('client', 'steps')
950        steps.append(self.__create_step_tuple(fn, args, dargs))
951        self._state.set('client', 'steps', steps)
952
953
954    def next_step(self, fn, *args, **dargs):
955        """Create a new step and place it after any steps added
956        while running the current step but before any steps added in
957        previous steps"""
958        steps = self._state.get('client', 'steps')
959        steps.insert(self._next_step_index,
960                     self.__create_step_tuple(fn, args, dargs))
961        self._next_step_index += 1
962        self._state.set('client', 'steps', steps)
963
964
965    def next_step_prepend(self, fn, *args, **dargs):
966        """Insert a new step, executing first"""
967        steps = self._state.get('client', 'steps')
968        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
969        self._next_step_index += 1
970        self._state.set('client', 'steps', steps)
971
972
973
974    def _run_step_fn(self, local_vars, fn, args, dargs):
975        """Run a (step) function within the given context"""
976
977        local_vars['__args'] = args
978        local_vars['__dargs'] = dargs
979        try:
980            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
981            return local_vars['__ret']
982        except SystemExit:
983            raise  # Send error.JobContinue and JobComplete on up to runjob.
984        except error.TestNAError, detail:
985            self.record(detail.exit_status, None, fn, str(detail))
986        except Exception, detail:
987            raise error.UnhandledJobError(detail)
988
989
990    def _create_frame(self, global_vars, ancestry, fn_name):
991        """Set up the environment like it would have been when this
992        function was first defined.
993
994        Child step engine 'implementations' must have 'return locals()'
995        at end end of their steps.  Because of this, we can call the
996        parent function and get back all child functions (i.e. those
997        defined within it).
998
999        Unfortunately, the call stack of the function calling
1000        job.next_step might have been deeper than the function it
1001        added.  In order to make sure that the environment is what it
1002        should be, we need to then pop off the frames we built until
1003        we find the frame where the function was first defined."""
1004
1005        # The copies ensure that the parent frames are not modified
1006        # while building child frames.  This matters if we then
1007        # pop some frames in the next part of this function.
1008        current_frame = copy.copy(global_vars)
1009        frames = [current_frame]
1010        for steps_fn_name in ancestry:
1011            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1012            current_frame = copy.copy(ret)
1013            frames.append(current_frame)
1014
1015        # Walk up the stack frames until we find the place fn_name was defined.
1016        while len(frames) > 2:
1017            if fn_name not in frames[-2]:
1018                break
1019            if frames[-2][fn_name] != frames[-1][fn_name]:
1020                break
1021            frames.pop()
1022            ancestry.pop()
1023
1024        return (frames[-1], ancestry)
1025
1026
1027    def _add_step_init(self, local_vars, current_function):
1028        """If the function returned a dictionary that includes a
1029        function named 'step_init', prepend it to our list of steps.
1030        This will only get run the first time a function with a nested
1031        use of the step engine is run."""
1032
1033        if (isinstance(local_vars, dict) and
1034            'step_init' in local_vars and
1035            callable(local_vars['step_init'])):
1036            # The init step is a child of the function
1037            # we were just running.
1038            self._current_step_ancestry.append(current_function)
1039            self.next_step_prepend('step_init')
1040
1041
1042    def step_engine(self):
1043        """The multi-run engine used when the control file defines step_init.
1044
1045        Does the next step.
1046        """
1047
1048        # Set up the environment and then interpret the control file.
1049        # Some control files will have code outside of functions,
1050        # which means we need to have our state engine initialized
1051        # before reading in the file.
1052        global_control_vars = {'job': self,
1053                               'args': self.args}
1054        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1055        try:
1056            execfile(self.control, global_control_vars, global_control_vars)
1057        except error.TestNAError, detail:
1058            self.record(detail.exit_status, None, self.control, str(detail))
1059        except SystemExit:
1060            raise  # Send error.JobContinue and JobComplete on up to runjob.
1061        except Exception, detail:
1062            # Syntax errors or other general Python exceptions coming out of
1063            # the top level of the control file itself go through here.
1064            raise error.UnhandledJobError(detail)
1065
1066        # If we loaded in a mid-job state file, then we presumably
1067        # know what steps we have yet to run.
1068        if not self._is_continuation:
1069            if 'step_init' in global_control_vars:
1070                self.next_step(global_control_vars['step_init'])
1071        else:
1072            # if last job failed due to unexpected reboot, record it as fail
1073            # so harness gets called
1074            last_job = self._state.get('client', 'unexpected_reboot', None)
1075            if last_job:
1076                subdir, testname = last_job
1077                self.record('FAIL', subdir, testname, 'unexpected reboot')
1078                self.record('END FAIL', subdir, testname)
1079
1080        # Iterate through the steps.  If we reboot, we'll simply
1081        # continue iterating on the next step.
1082        while len(self._state.get('client', 'steps')) > 0:
1083            steps = self._state.get('client', 'steps')
1084            (ancestry, fn_name, args, dargs) = steps.pop(0)
1085            self._state.set('client', 'steps', steps)
1086
1087            self._next_step_index = 0
1088            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1089            local_vars, self._current_step_ancestry = ret
1090            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1091            self._add_step_init(local_vars, fn_name)
1092
1093
1094    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1095        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1096                                   on_every_test)
1097
1098
1099    def add_sysinfo_logfile(self, file, on_every_test=False):
1100        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1101
1102
1103    def _add_sysinfo_loggable(self, loggable, on_every_test):
1104        if on_every_test:
1105            self.sysinfo.test_loggables.add(loggable)
1106        else:
1107            self.sysinfo.boot_loggables.add(loggable)
1108        self._save_sysinfo_state()
1109
1110
1111    def _load_sysinfo_state(self):
1112        state = self._state.get('client', 'sysinfo', None)
1113        if state:
1114            self.sysinfo.deserialize(state)
1115
1116
1117    def _save_sysinfo_state(self):
1118        state = self.sysinfo.serialize()
1119        self._state.set('client', 'sysinfo', state)
1120
1121
1122class disk_usage_monitor:
1123    def __init__(self, logging_func, device, max_mb_per_hour):
1124        self.func = logging_func
1125        self.device = device
1126        self.max_mb_per_hour = max_mb_per_hour
1127
1128
1129    def start(self):
1130        self.initial_space = utils.freespace(self.device)
1131        self.start_time = time.time()
1132
1133
1134    def stop(self):
1135        # if no maximum usage rate was set, we don't need to
1136        # generate any warnings
1137        if not self.max_mb_per_hour:
1138            return
1139
1140        final_space = utils.freespace(self.device)
1141        used_space = self.initial_space - final_space
1142        stop_time = time.time()
1143        total_time = stop_time - self.start_time
1144        # round up the time to one minute, to keep extremely short
1145        # tests from generating false positives due to short, badly
1146        # timed bursts of activity
1147        total_time = max(total_time, 60.0)
1148
1149        # determine the usage rate
1150        bytes_per_sec = used_space / total_time
1151        mb_per_sec = bytes_per_sec / 1024**2
1152        mb_per_hour = mb_per_sec * 60 * 60
1153
1154        if mb_per_hour > self.max_mb_per_hour:
1155            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1156            msg %= (self.device, mb_per_hour)
1157            self.func(msg)
1158
1159
1160    @classmethod
1161    def watch(cls, *monitor_args, **monitor_dargs):
1162        """ Generic decorator to wrap a function call with the
1163        standard create-monitor -> start -> call -> stop idiom."""
1164        def decorator(func):
1165            def watched_func(*args, **dargs):
1166                monitor = cls(*monitor_args, **monitor_dargs)
1167                monitor.start()
1168                try:
1169                    func(*args, **dargs)
1170                finally:
1171                    monitor.stop()
1172            return watched_func
1173        return decorator
1174
1175
1176def runjob(control, drop_caches, options):
1177    """
1178    Run a job using the given control file.
1179
1180    This is the main interface to this module.
1181
1182    @see base_job.__init__ for parameter info.
1183    """
1184    control = os.path.abspath(control)
1185    state = control + '.state'
1186    # Ensure state file is cleaned up before the job starts to run if autotest
1187    # is not running with the --continue flag
1188    if not options.cont and os.path.isfile(state):
1189        logging.debug('Cleaning up previously found state file')
1190        os.remove(state)
1191
1192    # instantiate the job object ready for the control file.
1193    myjob = None
1194    try:
1195        # Check that the control file is valid
1196        if not os.path.exists(control):
1197            raise error.JobError(control + ": control file not found")
1198
1199        # When continuing, the job is complete when there is no
1200        # state file, ensure we don't try and continue.
1201        if options.cont and not os.path.exists(state):
1202            raise error.JobComplete("all done")
1203
1204        myjob = job(control=control, drop_caches=drop_caches, options=options)
1205
1206        # Load in the users control file, may do any one of:
1207        #  1) execute in toto
1208        #  2) define steps, and select the first via next_step()
1209        myjob.step_engine()
1210
1211    except error.JobContinue:
1212        sys.exit(5)
1213
1214    except error.JobComplete:
1215        sys.exit(1)
1216
1217    except error.JobError, instance:
1218        logging.error("JOB ERROR: " + str(instance))
1219        if myjob:
1220            command = None
1221            if len(instance.args) > 1:
1222                command = instance.args[1]
1223                myjob.record('ABORT', None, command, str(instance))
1224            myjob.record('END ABORT', None, None, str(instance))
1225            assert myjob._record_indent == 0
1226            myjob.complete(1)
1227        else:
1228            sys.exit(1)
1229
1230    except Exception, e:
1231        # NOTE: job._run_step_fn and job.step_engine will turn things into
1232        # a JobError for us.  If we get here, its likely an autotest bug.
1233        msg = str(e) + '\n' + traceback.format_exc()
1234        logging.critical("JOB ERROR (autotest bug?): " + msg)
1235        if myjob:
1236            myjob.record('END ABORT', None, None, msg)
1237            assert myjob._record_indent == 0
1238            myjob.complete(1)
1239        else:
1240            sys.exit(1)
1241
1242    # If we get here, then we assume the job is complete and good.
1243    myjob.record('END GOOD', None, None)
1244    assert myjob._record_indent == 0
1245
1246    myjob.complete(0)
1247
1248
1249site_job = utils.import_site_class(
1250    __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
1251
1252class job(site_job):
1253    pass
1254