job.py revision ce955fcf58207d795ff1fe0001851b8b3728452c
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9import logging
10import cPickle as pickle
11from autotest_lib.client.bin import client_logging_config
12from autotest_lib.client.bin import utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, test, local_host
15from autotest_lib.client.bin import partition as partition_lib
16from autotest_lib.client.common_lib import error, barrier, log, logging_manager
17from autotest_lib.client.common_lib import base_packages, packages
18
19LAST_BOOT_TAG = object()
20NO_DEFAULT = object()
21JOB_PREAMBLE = """
22from autotest_lib.client.common_lib.error import *
23from autotest_lib.client.bin.utils import *
24"""
25
26
27class StepError(error.AutotestError):
28    pass
29
30class NotAvailableError(error.AutotestError):
31    pass
32
33
34
35def _run_test_complete_on_exit(f):
36    """Decorator for job methods that automatically calls
37    self.harness.run_test_complete when the method exits, if appropriate."""
38    def wrapped(self, *args, **dargs):
39        try:
40            return f(self, *args, **dargs)
41        finally:
42            if self.log_filename == self.DEFAULT_LOG_FILENAME:
43                self.harness.run_test_complete()
44                if self.drop_caches:
45                    logging.debug("Dropping caches")
46                    utils.drop_caches()
47    wrapped.__name__ = f.__name__
48    wrapped.__doc__ = f.__doc__
49    wrapped.__dict__.update(f.__dict__)
50    return wrapped
51
52
53class base_job(object):
54    """The actual job against which we do everything.
55
56    Properties:
57            autodir
58                    The top level autotest directory (/usr/local/autotest).
59                    Comes from os.environ['AUTODIR'].
60            bindir
61                    <autodir>/bin/
62            libdir
63                    <autodir>/lib/
64            testdir
65                    <autodir>/tests/
66            configdir
67                    <autodir>/config/
68            site_testdir
69                    <autodir>/site_tests/
70            profdir
71                    <autodir>/profilers/
72            tmpdir
73                    <autodir>/tmp/
74            pkgdir
75                    <autodir>/packages/
76            resultdir
77                    <autodir>/results/<jobtag>
78            toolsdir
79                    <autodir>/tools/
80            logging
81                    logging_manager.LoggingManager object
82            profilers
83                    the profilers object for this job
84            harness
85                    the server harness object for this job
86            config
87                    the job configuration for this job
88            drop_caches_between_iterations
89                    drop the pagecache between each iteration
90    """
91
92    DEFAULT_LOG_FILENAME = "status"
93    WARNING_DISABLE_DELAY = 5
94
95    def __init__(self, control, options, drop_caches=True,
96                 extra_copy_cmdline=None):
97        """
98        Prepare a client side job object.
99
100        @param control: The control file (pathname of).
101        @param options: an object which includes:
102                jobtag: The job tag string (eg "default").
103                cont: If this is the continuation of this job.
104                harness_type: An alternative server harness.  [None]
105                use_external_logging: If true, the enable_external_logging
106                          method will be called during construction.  [False]
107        @param drop_caches: If true, utils.drop_caches() is called before and
108                between all tests.  [True]
109        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
110                copy from the running kernel to all the installed kernels with
111                this job
112        """
113        self.autodir = os.environ['AUTODIR']
114        self.bindir = os.path.join(self.autodir, 'bin')
115        self.libdir = os.path.join(self.autodir, 'lib')
116        self.testdir = os.path.join(self.autodir, 'tests')
117        self.configdir = os.path.join(self.autodir, 'config')
118        self.site_testdir = os.path.join(self.autodir, 'site_tests')
119        self.profdir = os.path.join(self.autodir, 'profilers')
120        self.tmpdir = os.path.join(self.autodir, 'tmp')
121        self.toolsdir = os.path.join(self.autodir, 'tools')
122        self.resultdir = os.path.join(self.autodir, 'results', options.tag)
123
124        if not os.path.exists(self.resultdir):
125            os.makedirs(self.resultdir)
126
127        if not options.cont:
128            self._cleanup_results_dir()
129
130        logging_manager.configure_logging(
131                client_logging_config.ClientLoggingConfig(),
132                results_dir=self.resultdir,
133                verbose=options.verbose)
134        logging.info('Writing results to %s', self.resultdir)
135
136        self.drop_caches_between_iterations = False
137        self.drop_caches = drop_caches
138        if self.drop_caches:
139            logging.debug("Dropping caches")
140            utils.drop_caches()
141
142        self.control = os.path.realpath(control)
143        self._is_continuation = options.cont
144        self.state_file = self.control + '.state'
145        self.current_step_ancestry = []
146        self.next_step_index = 0
147        self.testtag = ''
148        self._test_tag_prefix = ''
149
150        self._load_state()
151        self.pkgmgr = packages.PackageManager(
152            self.autodir, run_function_dargs={'timeout':3600})
153        self.pkgdir = os.path.join(self.autodir, 'packages')
154        self.run_test_cleanup = self.get_state("__run_test_cleanup",
155                                                default=True)
156
157        self.sysinfo = sysinfo.sysinfo(self.resultdir)
158        self._load_sysinfo_state()
159
160        self.last_boot_tag = self.get_state("__last_boot_tag", default=None)
161        self.tag = self.get_state("__job_tag", default=None)
162
163        if not options.cont:
164            """
165            Don't cleanup the tmp dir (which contains the lockfile)
166            in the constructor, this would be a problem for multiple
167            jobs starting at the same time on the same client. Instead
168            do the delete at the server side. We simply create the tmp
169            directory here if it does not already exist.
170            """
171            if not os.path.exists(self.tmpdir):
172                os.mkdir(self.tmpdir)
173
174            if not os.path.exists(self.pkgdir):
175                os.mkdir(self.pkgdir)
176
177            results = os.path.join(self.autodir, 'results')
178            if not os.path.exists(results):
179                os.mkdir(results)
180
181            download = os.path.join(self.testdir, 'download')
182            if not os.path.exists(download):
183                os.mkdir(download)
184
185            os.makedirs(os.path.join(self.resultdir, 'analysis'))
186
187            shutil.copyfile(self.control,
188                            os.path.join(self.resultdir, 'control'))
189
190
191        self.control = control
192        self.jobtag = options.tag
193        self.log_filename = self.DEFAULT_LOG_FILENAME
194
195        self.logging = logging_manager.get_logging_manager(
196                manage_stdout_and_stderr=True, redirect_fds=True)
197        self.logging.start_logging()
198
199        self._init_group_level()
200
201        self.config = config.config(self)
202        self.harness = harness.select(options.harness, self)
203        self.profilers = profilers.profilers(self)
204        self.host = local_host.LocalHost(hostname=options.hostname)
205
206        try:
207            tool = self.config_get('boottool.executable')
208            self.bootloader = boottool.boottool(tool)
209        except:
210            pass
211
212        self.sysinfo.log_per_reboot_data()
213
214        if not options.cont:
215            self.record('START', None, None)
216            self._increment_group_level()
217
218        self.harness.run_start()
219
220        if options.log:
221            self.enable_external_logging()
222
223        # load the max disk usage rate - default to no monitoring
224        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
225
226        copy_cmdline = set(['console'])
227        if extra_copy_cmdline is not None:
228            copy_cmdline.update(extra_copy_cmdline)
229
230        # extract console= and other args from cmdline and add them into the
231        # base args that we use for all kernels we install
232        cmdline = utils.read_one_line('/proc/cmdline')
233        kernel_args = []
234        for karg in cmdline.split():
235            for param in copy_cmdline:
236                if karg.startswith(param) and \
237                    (len(param) == len(karg) or karg[len(param)] == '='):
238                    kernel_args.append(karg)
239        self.config_set('boot.default_args', ' '.join(kernel_args))
240
241
242    def _cleanup_results_dir(self):
243        """Delete everything in resultsdir"""
244        assert os.path.exists(self.resultdir)
245        list_files = glob.glob('%s/*' % self.resultdir)
246        for f in list_files:
247            if os.path.isdir(f):
248                shutil.rmtree(f)
249            elif os.path.isfile(f):
250                os.remove(f)
251
252
253    def disable_warnings(self, warning_type):
254        self.record("INFO", None, None,
255                    "disabling %s warnings" % warning_type,
256                    {"warnings.disable": warning_type})
257        time.sleep(self.WARNING_DISABLE_DELAY)
258
259
260    def enable_warnings(self, warning_type):
261        time.sleep(self.WARNING_DISABLE_DELAY)
262        self.record("INFO", None, None,
263                    "enabling %s warnings" % warning_type,
264                    {"warnings.enable": warning_type})
265
266
267    def monitor_disk_usage(self, max_rate):
268        """\
269        Signal that the job should monitor disk space usage on /
270        and generate a warning if a test uses up disk space at a
271        rate exceeding 'max_rate'.
272
273        Parameters:
274             max_rate - the maximium allowed rate of disk consumption
275                        during a test, in MB/hour, or 0 to indicate
276                        no limit.
277        """
278        self.set_state('__monitor_disk', max_rate)
279        self.max_disk_usage_rate = max_rate
280
281
282    def relative_path(self, path):
283        """\
284        Return a patch relative to the job results directory
285        """
286        head = len(self.resultdir) + 1     # remove the / inbetween
287        return path[head:]
288
289
290    def control_get(self):
291        return self.control
292
293
294    def control_set(self, control):
295        self.control = os.path.abspath(control)
296
297
298    def harness_select(self, which):
299        self.harness = harness.select(which, self)
300
301
302    def config_set(self, name, value):
303        self.config.set(name, value)
304
305
306    def config_get(self, name):
307        return self.config.get(name)
308
309
310    def setup_dirs(self, results_dir, tmp_dir):
311        if not tmp_dir:
312            tmp_dir = os.path.join(self.tmpdir, 'build')
313        if not os.path.exists(tmp_dir):
314            os.mkdir(tmp_dir)
315        if not os.path.isdir(tmp_dir):
316            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
317            raise ValueError(e_msg)
318
319        # We label the first build "build" and then subsequent ones
320        # as "build.2", "build.3", etc. Whilst this is a little bit
321        # inconsistent, 99.9% of jobs will only have one build
322        # (that's not done as kernbench, sparse, or buildtest),
323        # so it works out much cleaner. One of life's comprimises.
324        if not results_dir:
325            results_dir = os.path.join(self.resultdir, 'build')
326            i = 2
327            while os.path.exists(results_dir):
328                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
329                i += 1
330        if not os.path.exists(results_dir):
331            os.mkdir(results_dir)
332
333        return (results_dir, tmp_dir)
334
335
336    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
337                            kjob = None ):
338        """Summon a xen object"""
339        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
340        build_dir = 'xen'
341        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
342                       leave, kjob)
343
344
345    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
346        """Summon a kernel object"""
347        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
348        build_dir = 'linux'
349        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
350                                  build_dir, leave)
351
352
353    def barrier(self, *args, **kwds):
354        """Create a barrier object"""
355        return barrier.barrier(*args, **kwds)
356
357
358    def install_pkg(self, name, pkg_type, install_dir):
359        '''
360        This method is a simple wrapper around the actual package
361        installation method in the Packager class. This is used
362        internally by the profilers, deps and tests code.
363        name : name of the package (ex: sleeptest, dbench etc.)
364        pkg_type : Type of the package (ex: test, dep etc.)
365        install_dir : The directory in which the source is actually
366                      untarred into. (ex: client/profilers/<name> for profilers)
367        '''
368        if self.pkgmgr.repositories:
369            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
370
371
372    def add_repository(self, repo_urls):
373        '''
374        Adds the repository locations to the job so that packages
375        can be fetched from them when needed. The repository list
376        needs to be a string list
377        Ex: job.add_repository(['http://blah1','http://blah2'])
378        '''
379        for repo_url in repo_urls:
380            self.pkgmgr.add_repository(repo_url)
381
382        # Fetch the packages' checksum file that contains the checksums
383        # of all the packages if it is not already fetched. The checksum
384        # is always fetched whenever a job is first started. This
385        # is not done in the job's constructor as we don't have the list of
386        # the repositories there (and obviously don't care about this file
387        # if we are not using the repos)
388        try:
389            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
390                                              base_packages.CHECKSUM_FILE)
391            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
392                                  checksum_file_path, use_checksum=False)
393        except error.PackageFetchError:
394            # packaging system might not be working in this case
395            # Silently fall back to the normal case
396            pass
397
398
399    def require_gcc(self):
400        """
401        Test whether gcc is installed on the machine.
402        """
403        # check if gcc is installed on the system.
404        try:
405            utils.system('which gcc')
406        except error.CmdError, e:
407            raise NotAvailableError('gcc is required by this job and is '
408                                    'not available on the system')
409
410
411    def setup_dep(self, deps):
412        """Set up the dependencies for this test.
413        deps is a list of libraries required for this test.
414        """
415        # Fetch the deps from the repositories and set them up.
416        for dep in deps:
417            dep_dir = os.path.join(self.autodir, 'deps', dep)
418            # Search for the dependency in the repositories if specified,
419            # else check locally.
420            try:
421                self.install_pkg(dep, 'dep', dep_dir)
422            except error.PackageInstallError:
423                # see if the dep is there locally
424                pass
425
426            # dep_dir might not exist if it is not fetched from the repos
427            if not os.path.exists(dep_dir):
428                raise error.TestError("Dependency %s does not exist" % dep)
429
430            os.chdir(dep_dir)
431            utils.system('./' + dep + '.py')
432
433
434    def _runtest(self, url, tag, args, dargs):
435        try:
436            l = lambda : test.runtest(self, url, tag, args, dargs)
437            pid = parallel.fork_start(self.resultdir, l)
438            parallel.fork_waitfor(self.resultdir, pid)
439        except error.TestBaseException:
440            # These are already classified with an error type (exit_status)
441            raise
442        except error.JobError:
443            raise  # Caught further up and turned into an ABORT.
444        except Exception, e:
445            # Converts all other exceptions thrown by the test regardless
446            # of phase into a TestError(TestBaseException) subclass that
447            # reports them with their full stack trace.
448            raise error.UnhandledTestError(e)
449
450
451    @_run_test_complete_on_exit
452    def run_test(self, url, *args, **dargs):
453        """Summon a test object and run it.
454
455        tag
456                tag to add to testname
457        url
458                url of the test to run
459        """
460
461        if not url:
462            raise TypeError("Test name is invalid. "
463                            "Switched arguments?")
464        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
465        namelen = len(testname)
466        dargs = dargs.copy()
467        tntag = dargs.pop('tag', None)
468        if tntag:         # per-test tag  is included in reported test name
469            testname += '.' + str(tntag)
470        run_number = self.get_run_number()
471        if run_number:
472            testname += '._%02d_' % run_number
473            self.set_run_number(run_number + 1)
474        if self._is_kernel_in_test_tag():
475            testname += '.' + platform.release()
476        if self._test_tag_prefix:
477            testname += '.' + self._test_tag_prefix
478        if self.testtag:  # job-level tag is included in reported test name
479            testname += '.' + self.testtag
480        subdir = testname
481        sdtag = dargs.pop('subdir_tag', None)
482        if sdtag:  # subdir-only tag is not included in reports
483            subdir = subdir + '.' + str(sdtag)
484        tag = subdir[namelen+1:]    # '' if none
485
486        outputdir = os.path.join(self.resultdir, subdir)
487        if os.path.exists(outputdir):
488            msg = ("%s already exists, test <%s> may have"
489                    " already run with tag <%s>"
490                    % (outputdir, testname, tag) )
491            raise error.TestError(msg)
492        # NOTE: client/common_lib/test.py runtest() depends directory names
493        # being constructed the same way as in this code.
494        os.mkdir(outputdir)
495
496        def log_warning(reason):
497            self.record("WARN", subdir, testname, reason)
498        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
499        def group_func():
500            try:
501                self._runtest(url, tag, args, dargs)
502            except error.TestBaseException, detail:
503                # The error is already classified, record it properly.
504                self.record(detail.exit_status, subdir, testname, str(detail))
505                raise
506            else:
507                self.record('GOOD', subdir, testname, 'completed successfully')
508
509        try:
510            self._rungroup(subdir, testname, group_func)
511            return True
512        except error.TestBaseException:
513            return False
514        # Any other exception here will be given to the caller
515        #
516        # NOTE: The only exception possible from the control file here
517        # is error.JobError as _runtest() turns all others into an
518        # UnhandledTestError that is caught above.
519
520
521    def _rungroup(self, subdir, testname, function, *args, **dargs):
522        """\
523        subdir:
524                name of the group
525        testname:
526                name of the test to run, or support step
527        function:
528                subroutine to run
529        *args:
530                arguments for the function
531
532        Returns the result of the passed in function
533        """
534
535        try:
536            self.record('START', subdir, testname)
537            self._increment_group_level()
538            result = function(*args, **dargs)
539            self._decrement_group_level()
540            self.record('END GOOD', subdir, testname)
541            return result
542        except error.TestBaseException, e:
543            self._decrement_group_level()
544            self.record('END %s' % e.exit_status, subdir, testname)
545            raise
546        except error.JobError, e:
547            self._decrement_group_level()
548            self.record('END ABORT', subdir, testname)
549            raise
550        except Exception, e:
551            # This should only ever happen due to a bug in the given
552            # function's code.  The common case of being called by
553            # run_test() will never reach this.  If a control file called
554            # run_group() itself, bugs in its function will be caught
555            # here.
556            self._decrement_group_level()
557            err_msg = str(e) + '\n' + traceback.format_exc()
558            self.record('END ERROR', subdir, testname, err_msg)
559            raise
560
561
562    def run_group(self, function, tag=None, **dargs):
563        """
564        Run a function nested within a group level.
565
566        function:
567                Callable to run.
568        tag:
569                An optional tag name for the group.  If None (default)
570                function.__name__ will be used.
571        **dargs:
572                Named arguments for the function.
573        """
574        if tag:
575            name = tag
576        else:
577            name = function.__name__
578
579        try:
580            return self._rungroup(subdir=None, testname=name,
581                                  function=function, **dargs)
582        except (SystemExit, error.TestBaseException):
583            raise
584        # If there was a different exception, turn it into a TestError.
585        # It will be caught by step_engine or _run_step_fn.
586        except Exception, e:
587            raise error.UnhandledTestError(e)
588
589
590    _RUN_NUMBER_STATE = '__run_number'
591    def get_run_number(self):
592        """Get the run number or 0 if no run number has been set."""
593        return self.get_state(self._RUN_NUMBER_STATE, default=0)
594
595
596    def set_run_number(self, number):
597        """If the run number is non-zero it will be in the output dir name."""
598        self.set_state(self._RUN_NUMBER_STATE, number)
599
600
601    _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag'
602    def _is_kernel_in_test_tag(self):
603        """Boolean: should the kernel version be included in the test tag."""
604        return self.get_state(self._KERNEL_IN_TAG_STATE, default=False)
605
606
607    def show_kernel_in_test_tag(self, value=True):
608        """If True, the kernel version at test time will prefix test tags."""
609        self.set_state(self._KERNEL_IN_TAG_STATE, value)
610
611
612    def set_test_tag(self, tag=''):
613        """Set tag to be added to test name of all following run_test steps."""
614        self.testtag = tag
615
616
617    def set_test_tag_prefix(self, prefix):
618        """Set a prefix string to prepend to all future run_test steps.
619
620        Args:
621          prefix: A string to prepend to any run_test() step tags separated by
622              a '.'; use the empty string '' to clear it.
623        """
624        self._test_tag_prefix = prefix
625
626
627    def cpu_count(self):
628        return utils.count_cpus()  # use total system count
629
630
631    def start_reboot(self):
632        self.record('START', None, 'reboot')
633        self._increment_group_level()
634        self.record('GOOD', None, 'reboot.start')
635
636
637    def _record_reboot_failure(self, subdir, operation, status,
638                               running_id=None):
639        self.record("ABORT", subdir, operation, status)
640        self._decrement_group_level()
641        if not running_id:
642            running_id = utils.running_os_ident()
643        kernel = {"kernel": running_id.split("::")[0]}
644        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
645
646
647    def _check_post_reboot(self, subdir, running_id=None):
648        """
649        Function to perform post boot checks such as if the mounted
650        partition list has changed across reboots.
651        """
652        partition_list = partition_lib.get_partition_list(self)
653        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
654        old_mount_info = self.get_state("__mount_info")
655        if mount_info != old_mount_info:
656            new_entries = mount_info - old_mount_info
657            old_entries = old_mount_info - mount_info
658            description = ("mounted partitions are different after reboot "
659                           "(old entries: %s, new entries: %s)" %
660                           (old_entries, new_entries))
661            self._record_reboot_failure(subdir, "reboot.verify_config",
662                                        description, running_id=running_id)
663            raise error.JobError("Reboot failed: %s" % description)
664
665
666    def end_reboot(self, subdir, kernel, patches, running_id=None):
667        self._check_post_reboot(subdir, running_id=running_id)
668
669        # strip ::<timestamp> from the kernel version if present
670        kernel = kernel.split("::")[0]
671        kernel_info = {"kernel": kernel}
672        for i, patch in enumerate(patches):
673            kernel_info["patch%d" % i] = patch
674        self._decrement_group_level()
675        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
676
677
678    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
679                              type='src', patches=[]):
680        """ Check the passed kernel identifier against the command line
681            and the running kernel, abort the job on missmatch. """
682
683        logging.info("POST BOOT: checking booted kernel "
684                     "mark=%d identity='%s' type='%s'",
685                     expected_when, expected_id, type)
686
687        running_id = utils.running_os_ident()
688
689        cmdline = utils.read_one_line("/proc/cmdline")
690
691        find_sum = re.compile(r'.*IDENT=(\d+)')
692        m = find_sum.match(cmdline)
693        cmdline_when = -1
694        if m:
695            cmdline_when = int(m.groups()[0])
696
697        # We have all the facts, see if they indicate we
698        # booted the requested kernel or not.
699        bad = False
700        if (type == 'src' and expected_id != running_id or
701            type == 'rpm' and
702            not running_id.startswith(expected_id + '::')):
703            logging.error("Kernel identifier mismatch")
704            bad = True
705        if expected_when != cmdline_when:
706            logging.error("Kernel command line mismatch")
707            bad = True
708
709        if bad:
710            logging.error("   Expected Ident: " + expected_id)
711            logging.error("    Running Ident: " + running_id)
712            logging.error("    Expected Mark: %d", expected_when)
713            logging.error("Command Line Mark: %d", cmdline_when)
714            logging.error("     Command Line: " + cmdline)
715
716            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
717                                        running_id=running_id)
718            raise error.JobError("Reboot returned with the wrong kernel")
719
720        self.record('GOOD', subdir, 'reboot.verify',
721                    utils.running_os_full_version())
722        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
723
724
725    def partition(self, device, loop_size=0, mountpoint=None):
726        """
727        Work with a machine partition
728
729            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
730            @param mountpoint: Specify a directory to mount to. If not specified
731                               autotest tmp directory will be used.
732            @param loop_size: Size of loopback device (in MB). Defaults to 0.
733
734            @return: A L{client.bin.partition.partition} object
735        """
736
737        if not mountpoint:
738            mountpoint = self.tmpdir
739        return partition_lib.partition(self, device, loop_size, mountpoint)
740
741    @utils.deprecated
742    def filesystem(self, device, mountpoint=None, loop_size=0):
743        """ Same as partition
744
745        @deprecated: Use partition method instead
746        """
747        return self.partition(device, loop_size, mountpoint)
748
749
750    def enable_external_logging(self):
751        pass
752
753
754    def disable_external_logging(self):
755        pass
756
757
758    def enable_test_cleanup(self):
759        """ By default tests run test.cleanup """
760        self.set_state("__run_test_cleanup", True)
761        self.run_test_cleanup = True
762
763
764    def disable_test_cleanup(self):
765        """ By default tests do not run test.cleanup """
766        self.set_state("__run_test_cleanup", False)
767        self.run_test_cleanup = False
768
769
770    def default_test_cleanup(self, val):
771        if not self._is_continuation:
772            self.set_state("__run_test_cleanup", val)
773            self.run_test_cleanup = val
774
775
776    def default_boot_tag(self, tag):
777        if not self._is_continuation:
778            self.set_state("__last_boot_tag", tag)
779            self.last_boot_tag = tag
780
781
782    def default_tag(self, tag):
783        """Allows the scheduler's job tag to be passed in from autoserv."""
784        if not self._is_continuation:
785            self.set_state("__job_tag", tag)
786            self.tag = tag
787
788
789    def reboot_setup(self):
790        # save the partition list and their mount point and compare it
791        # after reboot
792        partition_list = partition_lib.get_partition_list(self)
793        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
794        self.set_state("__mount_info", mount_info)
795
796
797    def reboot(self, tag=LAST_BOOT_TAG):
798        if tag == LAST_BOOT_TAG:
799            tag = self.last_boot_tag
800        else:
801            self.set_state("__last_boot_tag", tag)
802            self.last_boot_tag = tag
803
804        self.reboot_setup()
805        self.harness.run_reboot()
806        default = self.config_get('boot.set_default')
807        if default:
808            self.bootloader.set_default(tag)
809        else:
810            self.bootloader.boot_once(tag)
811
812        # HACK: using this as a module sometimes hangs shutdown, so if it's
813        # installed unload it first
814        utils.system("modprobe -r netconsole", ignore_status=True)
815
816        # sync first, so that a sync during shutdown doesn't time out
817        utils.system("sync; sync", ignore_status=True)
818
819        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
820        self.quit()
821
822
823    def noop(self, text):
824        logging.info("job: noop: " + text)
825
826
827    @_run_test_complete_on_exit
828    def parallel(self, *tasklist):
829        """Run tasks in parallel"""
830
831        pids = []
832        old_log_filename = self.log_filename
833        for i, task in enumerate(tasklist):
834            assert isinstance(task, (tuple, list))
835            self.log_filename = old_log_filename + (".%d" % i)
836            task_func = lambda: task[0](*task[1:])
837            pids.append(parallel.fork_start(self.resultdir, task_func))
838
839        old_log_path = os.path.join(self.resultdir, old_log_filename)
840        old_log = open(old_log_path, "a")
841        exceptions = []
842        for i, pid in enumerate(pids):
843            # wait for the task to finish
844            try:
845                parallel.fork_waitfor(self.resultdir, pid)
846            except Exception, e:
847                exceptions.append(e)
848            # copy the logs from the subtask into the main log
849            new_log_path = old_log_path + (".%d" % i)
850            if os.path.exists(new_log_path):
851                new_log = open(new_log_path)
852                old_log.write(new_log.read())
853                new_log.close()
854                old_log.flush()
855                os.remove(new_log_path)
856        old_log.close()
857
858        self.log_filename = old_log_filename
859
860        # handle any exceptions raised by the parallel tasks
861        if exceptions:
862            msg = "%d task(s) failed in job.parallel" % len(exceptions)
863            raise error.JobError(msg)
864
865
866    def quit(self):
867        # XXX: should have a better name.
868        self.harness.run_pause()
869        raise error.JobContinue("more to come")
870
871
872    def complete(self, status):
873        """Clean up and exit"""
874        # We are about to exit 'complete' so clean up the control file.
875        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
876        os.rename(self.state_file, dest)
877
878        self.harness.run_complete()
879        self.disable_external_logging()
880        sys.exit(status)
881
882
883    def set_state(self, var, val):
884        # Deep copies make sure that the state can't be altered
885        # without it being re-written.  Perf wise, deep copies
886        # are overshadowed by pickling/loading.
887        self.state[var] = copy.deepcopy(val)
888        outfile = open(self.state_file, 'wb')
889        try:
890            pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL)
891        finally:
892            outfile.close()
893        logging.debug("Persistent state variable %s now set to %r", var, val)
894
895
896    def _load_state(self):
897        if hasattr(self, 'state'):
898            raise RuntimeError('state already exists')
899        infile = None
900        try:
901            try:
902                infile = open(self.state_file, 'rb')
903                self.state = pickle.load(infile)
904                logging.info('Loaded state from %s', self.state_file)
905            except IOError:
906                self.state = {}
907                initialize = True
908        finally:
909            if infile:
910                infile.close()
911
912        initialize = '__steps' not in self.state
913        if not (self._is_continuation or initialize):
914            raise RuntimeError('Loaded state must contain __steps or be a '
915                               'continuation.')
916
917        if initialize:
918            logging.info('Initializing the state engine')
919            self.set_state('__steps', [])  # writes pickle file
920
921
922    def get_state(self, var, default=NO_DEFAULT):
923        if var in self.state or default == NO_DEFAULT:
924            val = self.state[var]
925        else:
926            val = default
927        return copy.deepcopy(val)
928
929
930    def __create_step_tuple(self, fn, args, dargs):
931        # Legacy code passes in an array where the first arg is
932        # the function or its name.
933        if isinstance(fn, list):
934            assert(len(args) == 0)
935            assert(len(dargs) == 0)
936            args = fn[1:]
937            fn = fn[0]
938        # Pickling actual functions is hairy, thus we have to call
939        # them by name.  Unfortunately, this means only functions
940        # defined globally can be used as a next step.
941        if callable(fn):
942            fn = fn.__name__
943        if not isinstance(fn, types.StringTypes):
944            raise StepError("Next steps must be functions or "
945                            "strings containing the function name")
946        ancestry = copy.copy(self.current_step_ancestry)
947        return (ancestry, fn, args, dargs)
948
949
950    def next_step_append(self, fn, *args, **dargs):
951        """Define the next step and place it at the end"""
952        steps = self.get_state('__steps')
953        steps.append(self.__create_step_tuple(fn, args, dargs))
954        self.set_state('__steps', steps)
955
956
957    def next_step(self, fn, *args, **dargs):
958        """Create a new step and place it after any steps added
959        while running the current step but before any steps added in
960        previous steps"""
961        steps = self.get_state('__steps')
962        steps.insert(self.next_step_index,
963                     self.__create_step_tuple(fn, args, dargs))
964        self.next_step_index += 1
965        self.set_state('__steps', steps)
966
967
968    def next_step_prepend(self, fn, *args, **dargs):
969        """Insert a new step, executing first"""
970        steps = self.get_state('__steps')
971        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
972        self.next_step_index += 1
973        self.set_state('__steps', steps)
974
975
976    def _run_step_fn(self, local_vars, fn, args, dargs):
977        """Run a (step) function within the given context"""
978
979        local_vars['__args'] = args
980        local_vars['__dargs'] = dargs
981        try:
982            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
983            return local_vars['__ret']
984        except SystemExit:
985            raise  # Send error.JobContinue and JobComplete on up to runjob.
986        except error.TestNAError, detail:
987            self.record(detail.exit_status, None, fn, str(detail))
988        except Exception, detail:
989            raise error.UnhandledJobError(detail)
990
991
992    def _create_frame(self, global_vars, ancestry, fn_name):
993        """Set up the environment like it would have been when this
994        function was first defined.
995
996        Child step engine 'implementations' must have 'return locals()'
997        at end end of their steps.  Because of this, we can call the
998        parent function and get back all child functions (i.e. those
999        defined within it).
1000
1001        Unfortunately, the call stack of the function calling
1002        job.next_step might have been deeper than the function it
1003        added.  In order to make sure that the environment is what it
1004        should be, we need to then pop off the frames we built until
1005        we find the frame where the function was first defined."""
1006
1007        # The copies ensure that the parent frames are not modified
1008        # while building child frames.  This matters if we then
1009        # pop some frames in the next part of this function.
1010        current_frame = copy.copy(global_vars)
1011        frames = [current_frame]
1012        for steps_fn_name in ancestry:
1013            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
1014            current_frame = copy.copy(ret)
1015            frames.append(current_frame)
1016
1017        # Walk up the stack frames until we find the place fn_name was defined.
1018        while len(frames) > 2:
1019            if fn_name not in frames[-2]:
1020                break
1021            if frames[-2][fn_name] != frames[-1][fn_name]:
1022                break
1023            frames.pop()
1024            ancestry.pop()
1025
1026        return (frames[-1], ancestry)
1027
1028
1029    def _add_step_init(self, local_vars, current_function):
1030        """If the function returned a dictionary that includes a
1031        function named 'step_init', prepend it to our list of steps.
1032        This will only get run the first time a function with a nested
1033        use of the step engine is run."""
1034
1035        if (isinstance(local_vars, dict) and
1036            'step_init' in local_vars and
1037            callable(local_vars['step_init'])):
1038            # The init step is a child of the function
1039            # we were just running.
1040            self.current_step_ancestry.append(current_function)
1041            self.next_step_prepend('step_init')
1042
1043
1044    def step_engine(self):
1045        """The multi-run engine used when the control file defines step_init.
1046
1047        Does the next step.
1048        """
1049
1050        # Set up the environment and then interpret the control file.
1051        # Some control files will have code outside of functions,
1052        # which means we need to have our state engine initialized
1053        # before reading in the file.
1054        global_control_vars = {'job': self}
1055        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
1056        try:
1057            execfile(self.control, global_control_vars, global_control_vars)
1058        except error.TestNAError, detail:
1059            self.record(detail.exit_status, None, self.control, str(detail))
1060        except SystemExit:
1061            raise  # Send error.JobContinue and JobComplete on up to runjob.
1062        except Exception, detail:
1063            # Syntax errors or other general Python exceptions coming out of
1064            # the top level of the control file itself go through here.
1065            raise error.UnhandledJobError(detail)
1066
1067        # If we loaded in a mid-job state file, then we presumably
1068        # know what steps we have yet to run.
1069        if not self._is_continuation:
1070            if 'step_init' in global_control_vars:
1071                self.next_step(global_control_vars['step_init'])
1072
1073        # Iterate through the steps.  If we reboot, we'll simply
1074        # continue iterating on the next step.
1075        while len(self.get_state('__steps')) > 0:
1076            steps = self.get_state('__steps')
1077            (ancestry, fn_name, args, dargs) = steps.pop(0)
1078            self.set_state('__steps', steps)
1079
1080            self.next_step_index = 0
1081            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1082            local_vars, self.current_step_ancestry = ret
1083            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1084            self._add_step_init(local_vars, fn_name)
1085
1086
1087    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1088        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1089                                   on_every_test)
1090
1091
1092    def add_sysinfo_logfile(self, file, on_every_test=False):
1093        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1094
1095
1096    def _add_sysinfo_loggable(self, loggable, on_every_test):
1097        if on_every_test:
1098            self.sysinfo.test_loggables.add(loggable)
1099        else:
1100            self.sysinfo.boot_loggables.add(loggable)
1101        self._save_sysinfo_state()
1102
1103
1104    def _load_sysinfo_state(self):
1105        state = self.get_state("__sysinfo", None)
1106        if state:
1107            self.sysinfo.deserialize(state)
1108
1109
1110    def _save_sysinfo_state(self):
1111        state = self.sysinfo.serialize()
1112        self.set_state("__sysinfo", state)
1113
1114
1115    def _init_group_level(self):
1116        self.group_level = self.get_state("__group_level", default=0)
1117
1118
1119    def _increment_group_level(self):
1120        self.group_level += 1
1121        self.set_state("__group_level", self.group_level)
1122
1123
1124    def _decrement_group_level(self):
1125        self.group_level -= 1
1126        self.set_state("__group_level", self.group_level)
1127
1128
1129    def record(self, status_code, subdir, operation, status = '',
1130               optional_fields=None):
1131        """
1132        Record job-level status
1133
1134        The intent is to make this file both machine parseable and
1135        human readable. That involves a little more complexity, but
1136        really isn't all that bad ;-)
1137
1138        Format is <status code>\t<subdir>\t<operation>\t<status>
1139
1140        status code: (GOOD|WARN|FAIL|ABORT)
1141                or   START
1142                or   END (GOOD|WARN|FAIL|ABORT)
1143
1144        subdir: MUST be a relevant subdirectory in the results,
1145        or None, which will be represented as '----'
1146
1147        operation: description of what you ran (e.g. "dbench", or
1148                                        "mkfs -t foobar /dev/sda9")
1149
1150        status: error message or "completed sucessfully"
1151
1152        ------------------------------------------------------------
1153
1154        Initial tabs indicate indent levels for grouping, and is
1155        governed by self.group_level
1156
1157        multiline messages have secondary lines prefaced by a double
1158        space ('  ')
1159        """
1160
1161        if subdir:
1162            if re.match(r'[\n\t]', subdir):
1163                raise ValueError("Invalid character in subdir string")
1164            substr = subdir
1165        else:
1166            substr = '----'
1167
1168        if not log.is_valid_status(status_code):
1169            raise ValueError("Invalid status code supplied: %s" % status_code)
1170        if not operation:
1171            operation = '----'
1172
1173        if re.match(r'[\n\t]', operation):
1174            raise ValueError("Invalid character in operation string")
1175        operation = operation.rstrip()
1176
1177        if not optional_fields:
1178            optional_fields = {}
1179
1180        status = status.rstrip()
1181        status = re.sub(r"\t", "  ", status)
1182        # Ensure any continuation lines are marked so we can
1183        # detect them in the status file to ensure it is parsable.
1184        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
1185
1186        # Generate timestamps for inclusion in the logs
1187        epoch_time = int(time.time())  # seconds since epoch, in UTC
1188        local_time = time.localtime(epoch_time)
1189        optional_fields["timestamp"] = str(epoch_time)
1190        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1191                                                     local_time)
1192
1193        fields = [status_code, substr, operation]
1194        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1195        fields.append(status)
1196
1197        msg = '\t'.join(str(x) for x in fields)
1198        msg = '\t' * self.group_level + msg
1199
1200        msg_tag = ""
1201        if "." in self.log_filename:
1202            msg_tag = self.log_filename.split(".", 1)[1]
1203
1204        self.harness.test_status_detail(status_code, substr, operation, status,
1205                                        msg_tag)
1206        self.harness.test_status(msg, msg_tag)
1207
1208        # log to stdout (if enabled)
1209        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
1210        logging.info(msg)
1211
1212        # log to the "root" status log
1213        status_file = os.path.join(self.resultdir, self.log_filename)
1214        open(status_file, "a").write(msg + "\n")
1215
1216        # log to the subdir status log (if subdir is set)
1217        if subdir:
1218            dir = os.path.join(self.resultdir, subdir)
1219            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1220            open(status_file, "a").write(msg + "\n")
1221
1222
1223class disk_usage_monitor:
1224    def __init__(self, logging_func, device, max_mb_per_hour):
1225        self.func = logging_func
1226        self.device = device
1227        self.max_mb_per_hour = max_mb_per_hour
1228
1229
1230    def start(self):
1231        self.initial_space = utils.freespace(self.device)
1232        self.start_time = time.time()
1233
1234
1235    def stop(self):
1236        # if no maximum usage rate was set, we don't need to
1237        # generate any warnings
1238        if not self.max_mb_per_hour:
1239            return
1240
1241        final_space = utils.freespace(self.device)
1242        used_space = self.initial_space - final_space
1243        stop_time = time.time()
1244        total_time = stop_time - self.start_time
1245        # round up the time to one minute, to keep extremely short
1246        # tests from generating false positives due to short, badly
1247        # timed bursts of activity
1248        total_time = max(total_time, 60.0)
1249
1250        # determine the usage rate
1251        bytes_per_sec = used_space / total_time
1252        mb_per_sec = bytes_per_sec / 1024**2
1253        mb_per_hour = mb_per_sec * 60 * 60
1254
1255        if mb_per_hour > self.max_mb_per_hour:
1256            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1257            msg %= (self.device, mb_per_hour)
1258            self.func(msg)
1259
1260
1261    @classmethod
1262    def watch(cls, *monitor_args, **monitor_dargs):
1263        """ Generic decorator to wrap a function call with the
1264        standard create-monitor -> start -> call -> stop idiom."""
1265        def decorator(func):
1266            def watched_func(*args, **dargs):
1267                monitor = cls(*monitor_args, **monitor_dargs)
1268                monitor.start()
1269                try:
1270                    func(*args, **dargs)
1271                finally:
1272                    monitor.stop()
1273            return watched_func
1274        return decorator
1275
1276
1277def runjob(control, options):
1278    """
1279    Run a job using the given control file.
1280
1281    This is the main interface to this module.
1282
1283    @see base_job.__init__ for parameter info.
1284    """
1285    control = os.path.abspath(control)
1286    state = control + '.state'
1287    # Ensure state file is cleaned up before the job starts to run if autotest
1288    # is not running with the --continue flag
1289    if not options.cont and os.path.isfile(state):
1290        logging.debug('Cleaning up previously found state file')
1291        os.remove(state)
1292
1293    # instantiate the job object ready for the control file.
1294    myjob = None
1295    try:
1296        # Check that the control file is valid
1297        if not os.path.exists(control):
1298            raise error.JobError(control + ": control file not found")
1299
1300        # When continuing, the job is complete when there is no
1301        # state file, ensure we don't try and continue.
1302        if options.cont and not os.path.exists(state):
1303            raise error.JobComplete("all done")
1304
1305        myjob = job(control, options)
1306
1307        # Load in the users control file, may do any one of:
1308        #  1) execute in toto
1309        #  2) define steps, and select the first via next_step()
1310        myjob.step_engine()
1311
1312    except error.JobContinue:
1313        sys.exit(5)
1314
1315    except error.JobComplete:
1316        sys.exit(1)
1317
1318    except error.JobError, instance:
1319        logging.error("JOB ERROR: " + instance.args[0])
1320        if myjob:
1321            command = None
1322            if len(instance.args) > 1:
1323                command = instance.args[1]
1324                myjob.record('ABORT', None, command, instance.args[0])
1325            myjob._decrement_group_level()
1326            myjob.record('END ABORT', None, None, instance.args[0])
1327            assert (myjob.group_level == 0), ('myjob.group_level must be 0,'
1328                                              ' not %d' % myjob.group_level)
1329            myjob.complete(1)
1330        else:
1331            sys.exit(1)
1332
1333    except Exception, e:
1334        # NOTE: job._run_step_fn and job.step_engine will turn things into
1335        # a JobError for us.  If we get here, its likely an autotest bug.
1336        msg = str(e) + '\n' + traceback.format_exc()
1337        logging.critical("JOB ERROR (autotest bug?): " + msg)
1338        if myjob:
1339            myjob._decrement_group_level()
1340            myjob.record('END ABORT', None, None, msg)
1341            assert(myjob.group_level == 0)
1342            myjob.complete(1)
1343        else:
1344            sys.exit(1)
1345
1346    # If we get here, then we assume the job is complete and good.
1347    myjob._decrement_group_level()
1348    myjob.record('END GOOD', None, None)
1349    assert(myjob.group_level == 0)
1350
1351    myjob.complete(0)
1352
1353
1354site_job = utils.import_site_class(
1355    __file__, "autotest_lib.client.bin.site_job", "site_job", base_job)
1356
1357class job(site_job):
1358    pass
1359