job.py revision fc3da5bbdcd63840cd224282812c050c5520db11
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types, glob
9import logging, getpass, errno
10import cPickle as pickle
11from autotest_lib.client.bin import client_logging_config
12from autotest_lib.client.bin import utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, test, local_host
15from autotest_lib.client.bin import partition as partition_lib
16from autotest_lib.client.common_lib import base_job
17from autotest_lib.client.common_lib import error, barrier, log, logging_manager
18from autotest_lib.client.common_lib import base_packages, packages
19from autotest_lib.client.common_lib import global_config
20
21
22LAST_BOOT_TAG = object()
23JOB_PREAMBLE = """
24from autotest_lib.client.common_lib.error import *
25from autotest_lib.client.bin.utils import *
26"""
27
28
29class StepError(error.AutotestError):
30    pass
31
32class NotAvailableError(error.AutotestError):
33    pass
34
35
36
37def _run_test_complete_on_exit(f):
38    """Decorator for job methods that automatically calls
39    self.harness.run_test_complete when the method exits, if appropriate."""
40    def wrapped(self, *args, **dargs):
41        try:
42            return f(self, *args, **dargs)
43        finally:
44            if self._log_filename == self._DEFAULT_LOG_FILENAME:
45                self.harness.run_test_complete()
46                if self.drop_caches:
47                    logging.debug("Dropping caches")
48                    utils.drop_caches()
49    wrapped.__name__ = f.__name__
50    wrapped.__doc__ = f.__doc__
51    wrapped.__dict__.update(f.__dict__)
52    return wrapped
53
54
55class base_client_job(base_job.base_job):
56    """The client-side concrete implementation of base_job.
57
58    Optional properties provided by this implementation:
59        control
60        bootloader
61        harness
62    """
63
64    _DEFAULT_LOG_FILENAME = "status"
65    _WARNING_DISABLE_DELAY = 5
66
67
68    def __init__(self, control, options, drop_caches=True,
69                 extra_copy_cmdline=None):
70        """
71        Prepare a client side job object.
72
73        @param control: The control file (pathname of).
74        @param options: an object which includes:
75                jobtag: The job tag string (eg "default").
76                cont: If this is the continuation of this job.
77                harness_type: An alternative server harness.  [None]
78                use_external_logging: If true, the enable_external_logging
79                          method will be called during construction.  [False]
80        @param drop_caches: If true, utils.drop_caches() is called before and
81                between all tests.  [True]
82        @param extra_copy_cmdline: list of additional /proc/cmdline arguments to
83                copy from the running kernel to all the installed kernels with
84                this job
85        """
86        super(base_client_job, self).__init__(options=options)
87        self._pre_record_init(control, options)
88        try:
89            self._post_record_init(control, options, drop_caches,
90                                   extra_copy_cmdline)
91        except Exception, err:
92            self.record(
93                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
94                    str(err))
95            raise
96
97
98    @classmethod
99    def _get_environ_autodir(cls):
100        return os.environ['AUTODIR']
101
102
103    @classmethod
104    def _find_base_directories(cls):
105        """
106        Determine locations of autodir and clientdir (which are the same)
107        using os.environ. Serverdir does not exist in this context.
108        """
109        autodir = clientdir = cls._get_environ_autodir()
110        return autodir, clientdir, None
111
112
113    def _find_resultdir(self, options):
114        """
115        Determine the directory for storing results. On a client this is
116        always <autodir>/results/<tag>, where tag is passed in on the command
117        line as an option.
118        """
119        return os.path.join(self.autodir, 'results', options.tag)
120
121
122    def _pre_record_init(self, control, options):
123        """
124        Initialization function that should peform ONLY the required
125        setup so that the self.record() method works.
126
127        As of now self.record() needs self.resultdir, self._group_level,
128        self._log_filename, self.harness.
129        """
130        if not options.cont:
131            self._cleanup_results_dir()
132
133        logging_manager.configure_logging(
134                client_logging_config.ClientLoggingConfig(),
135                results_dir=self.resultdir,
136                verbose=options.verbose)
137        logging.info('Writing results to %s', self.resultdir)
138
139        self._log_filename = self._DEFAULT_LOG_FILENAME
140
141        # init_group_level needs the state
142        self.control = os.path.realpath(control)
143        self._is_continuation = options.cont
144        self._current_step_ancestry = []
145        self._next_step_index = 0
146        self._load_state()
147
148        self._init_group_level()
149
150        self.harness = harness.select(options.harness, self)
151
152
153    def _post_record_init(self, control, options, drop_caches,
154                          extra_copy_cmdline):
155        """
156        Perform job initialization not required by self.record().
157        """
158        self._init_drop_caches(drop_caches)
159
160        self._init_packages()
161
162        self.sysinfo = sysinfo.sysinfo(self.resultdir)
163        self._load_sysinfo_state()
164
165        self.tag = self.get_state("__job_tag", default=None)
166
167        if not options.cont:
168            download = os.path.join(self.testdir, 'download')
169            if not os.path.exists(download):
170                os.mkdir(download)
171
172            os.makedirs(os.path.join(self.resultdir, 'analysis'))
173
174            shutil.copyfile(self.control,
175                            os.path.join(self.resultdir, 'control'))
176
177        self.control = control
178
179        self.logging = logging_manager.get_logging_manager(
180                manage_stdout_and_stderr=True, redirect_fds=True)
181        self.logging.start_logging()
182
183        self._config = config.config(self)
184        self.profilers = profilers.profilers(self)
185
186        self._init_bootloader()
187
188        self.machines = [options.hostname]
189        self.hosts = set([local_host.LocalHost(hostname=options.hostname,
190                                               bootloader=self.bootloader)])
191
192        if options.user:
193            self.user = options.user
194        else:
195            self.user = getpass.getuser()
196
197        self.sysinfo.log_per_reboot_data()
198
199        if not options.cont:
200            self.record('START', None, None)
201            self._increment_group_level()
202
203        self.harness.run_start()
204
205        if options.log:
206            self.enable_external_logging()
207
208        # load the max disk usage rate - default to no monitoring
209        self._max_disk_usage_rate = self.get_state('__monitor_disk',
210                                                   default=0.0)
211
212        self._init_cmdline(extra_copy_cmdline)
213
214        self.num_tests_run = None
215        self.num_tests_failed = None
216
217        self.warning_loggers = None
218        self.warning_manager = None
219
220
221    def _init_drop_caches(self, drop_caches):
222        """
223        Perform the drop caches initialization.
224        """
225        self.drop_caches_between_iterations = (
226                       global_config.global_config.get_config_value('CLIENT',
227                                            'drop_caches_between_iterations',
228                                            type=bool, default=True))
229        self.drop_caches = drop_caches
230        if self.drop_caches:
231            logging.debug("Dropping caches")
232            utils.drop_caches()
233
234
235    def _init_bootloader(self):
236        """
237        Perform boottool initialization.
238        """
239        tool = self.config_get('boottool.executable')
240        self.bootloader = boottool.boottool(tool)
241
242
243    def _init_packages(self):
244        """
245        Perform the packages support initialization.
246        """
247        self.pkgmgr = packages.PackageManager(
248            self.autodir, run_function_dargs={'timeout':3600})
249
250
251    def _init_cmdline(self, extra_copy_cmdline):
252        """
253        Initialize default cmdline for booted kernels in this job.
254        """
255        copy_cmdline = set(['console'])
256        if extra_copy_cmdline is not None:
257            copy_cmdline.update(extra_copy_cmdline)
258
259        # extract console= and other args from cmdline and add them into the
260        # base args that we use for all kernels we install
261        cmdline = utils.read_one_line('/proc/cmdline')
262        kernel_args = []
263        for karg in cmdline.split():
264            for param in copy_cmdline:
265                if karg.startswith(param) and \
266                    (len(param) == len(karg) or karg[len(param)] == '='):
267                    kernel_args.append(karg)
268        self.config_set('boot.default_args', ' '.join(kernel_args))
269
270
271    def _cleanup_results_dir(self):
272        """Delete everything in resultsdir"""
273        assert os.path.exists(self.resultdir)
274        list_files = glob.glob('%s/*' % self.resultdir)
275        for f in list_files:
276            if os.path.isdir(f):
277                shutil.rmtree(f)
278            elif os.path.isfile(f):
279                os.remove(f)
280
281
282    def disable_warnings(self, warning_type):
283        self.record("INFO", None, None,
284                    "disabling %s warnings" % warning_type,
285                    {"warnings.disable": warning_type})
286        time.sleep(self._WARNING_DISABLE_DELAY)
287
288
289    def enable_warnings(self, warning_type):
290        time.sleep(self._WARNING_DISABLE_DELAY)
291        self.record("INFO", None, None,
292                    "enabling %s warnings" % warning_type,
293                    {"warnings.enable": warning_type})
294
295
296    def monitor_disk_usage(self, max_rate):
297        """\
298        Signal that the job should monitor disk space usage on /
299        and generate a warning if a test uses up disk space at a
300        rate exceeding 'max_rate'.
301
302        Parameters:
303             max_rate - the maximium allowed rate of disk consumption
304                        during a test, in MB/hour, or 0 to indicate
305                        no limit.
306        """
307        self.set_state('__monitor_disk', max_rate)
308        self._max_disk_usage_rate = max_rate
309
310
311    def relative_path(self, path):
312        """\
313        Return a patch relative to the job results directory
314        """
315        head = len(self.resultdir) + 1     # remove the / inbetween
316        return path[head:]
317
318
319    def control_get(self):
320        return self.control
321
322
323    def control_set(self, control):
324        self.control = os.path.abspath(control)
325
326
327    def harness_select(self, which):
328        self.harness = harness.select(which, self)
329
330
331    def config_set(self, name, value):
332        self._config.set(name, value)
333
334
335    def config_get(self, name):
336        return self._config.get(name)
337
338
339    def setup_dirs(self, results_dir, tmp_dir):
340        if not tmp_dir:
341            tmp_dir = os.path.join(self.tmpdir, 'build')
342        if not os.path.exists(tmp_dir):
343            os.mkdir(tmp_dir)
344        if not os.path.isdir(tmp_dir):
345            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
346            raise ValueError(e_msg)
347
348        # We label the first build "build" and then subsequent ones
349        # as "build.2", "build.3", etc. Whilst this is a little bit
350        # inconsistent, 99.9% of jobs will only have one build
351        # (that's not done as kernbench, sparse, or buildtest),
352        # so it works out much cleaner. One of life's comprimises.
353        if not results_dir:
354            results_dir = os.path.join(self.resultdir, 'build')
355            i = 2
356            while os.path.exists(results_dir):
357                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
358                i += 1
359        if not os.path.exists(results_dir):
360            os.mkdir(results_dir)
361
362        return (results_dir, tmp_dir)
363
364
365    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
366                            kjob = None ):
367        """Summon a xen object"""
368        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
369        build_dir = 'xen'
370        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
371                       leave, kjob)
372
373
374    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
375        """Summon a kernel object"""
376        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
377        build_dir = 'linux'
378        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
379                                  build_dir, leave)
380
381
382    def barrier(self, *args, **kwds):
383        """Create a barrier object"""
384        return barrier.barrier(*args, **kwds)
385
386
387    def install_pkg(self, name, pkg_type, install_dir):
388        '''
389        This method is a simple wrapper around the actual package
390        installation method in the Packager class. This is used
391        internally by the profilers, deps and tests code.
392        name : name of the package (ex: sleeptest, dbench etc.)
393        pkg_type : Type of the package (ex: test, dep etc.)
394        install_dir : The directory in which the source is actually
395                      untarred into. (ex: client/profilers/<name> for profilers)
396        '''
397        if self.pkgmgr.repositories:
398            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)
399
400
401    def add_repository(self, repo_urls):
402        '''
403        Adds the repository locations to the job so that packages
404        can be fetched from them when needed. The repository list
405        needs to be a string list
406        Ex: job.add_repository(['http://blah1','http://blah2'])
407        '''
408        for repo_url in repo_urls:
409            self.pkgmgr.add_repository(repo_url)
410
411        # Fetch the packages' checksum file that contains the checksums
412        # of all the packages if it is not already fetched. The checksum
413        # is always fetched whenever a job is first started. This
414        # is not done in the job's constructor as we don't have the list of
415        # the repositories there (and obviously don't care about this file
416        # if we are not using the repos)
417        try:
418            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
419                                              base_packages.CHECKSUM_FILE)
420            self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE,
421                                  checksum_file_path, use_checksum=False)
422        except error.PackageFetchError:
423            # packaging system might not be working in this case
424            # Silently fall back to the normal case
425            pass
426
427
428    def require_gcc(self):
429        """
430        Test whether gcc is installed on the machine.
431        """
432        # check if gcc is installed on the system.
433        try:
434            utils.system('which gcc')
435        except error.CmdError, e:
436            raise NotAvailableError('gcc is required by this job and is '
437                                    'not available on the system')
438
439
440    def setup_dep(self, deps):
441        """Set up the dependencies for this test.
442        deps is a list of libraries required for this test.
443        """
444        # Fetch the deps from the repositories and set them up.
445        for dep in deps:
446            dep_dir = os.path.join(self.autodir, 'deps', dep)
447            # Search for the dependency in the repositories if specified,
448            # else check locally.
449            try:
450                self.install_pkg(dep, 'dep', dep_dir)
451            except error.PackageInstallError:
452                # see if the dep is there locally
453                pass
454
455            # dep_dir might not exist if it is not fetched from the repos
456            if not os.path.exists(dep_dir):
457                raise error.TestError("Dependency %s does not exist" % dep)
458
459            os.chdir(dep_dir)
460            utils.system('./' + dep + '.py')
461
462
463    def _runtest(self, url, tag, args, dargs):
464        try:
465            l = lambda : test.runtest(self, url, tag, args, dargs)
466            pid = parallel.fork_start(self.resultdir, l)
467            parallel.fork_waitfor(self.resultdir, pid)
468        except error.TestBaseException:
469            # These are already classified with an error type (exit_status)
470            raise
471        except error.JobError:
472            raise  # Caught further up and turned into an ABORT.
473        except Exception, e:
474            # Converts all other exceptions thrown by the test regardless
475            # of phase into a TestError(TestBaseException) subclass that
476            # reports them with their full stack trace.
477            raise error.UnhandledTestError(e)
478
479
480    @_run_test_complete_on_exit
481    def run_test(self, url, *args, **dargs):
482        """
483        Summon a test object and run it.
484
485        @param url A url that identifies the test to run.
486        @param tag An optional keyword argument that will be added to the
487            test and subdir name.
488        @param subdir_tag An optional keyword argument that will be added
489            to the subdir name.
490
491        @returns True if the test passes, False otherwise.
492        """
493        group, testname = self.pkgmgr.get_package_name(url, 'test')
494        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
495        outputdir = self._make_test_outputdir(subdir)
496
497        def log_warning(reason):
498            self.record("WARN", subdir, testname, reason)
499        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)
500        def group_func():
501            try:
502                self._runtest(url, tag, args, dargs)
503            except error.TestBaseException, detail:
504                # The error is already classified, record it properly.
505                self.record(detail.exit_status, subdir, testname, str(detail))
506                raise
507            else:
508                self.record('GOOD', subdir, testname, 'completed successfully')
509
510        try:
511            self._rungroup(subdir, testname, group_func)
512            return True
513        except error.TestBaseException:
514            return False
515        # Any other exception here will be given to the caller
516        #
517        # NOTE: The only exception possible from the control file here
518        # is error.JobError as _runtest() turns all others into an
519        # UnhandledTestError that is caught above.
520
521
522    def _rungroup(self, subdir, testname, function, *args, **dargs):
523        """\
524        subdir:
525                name of the group
526        testname:
527                name of the test to run, or support step
528        function:
529                subroutine to run
530        *args:
531                arguments for the function
532
533        Returns the result of the passed in function
534        """
535
536        try:
537            self.record('START', subdir, testname)
538            self._increment_group_level()
539            result = function(*args, **dargs)
540            self._decrement_group_level()
541            self.record('END GOOD', subdir, testname)
542            return result
543        except error.TestBaseException, e:
544            self._decrement_group_level()
545            self.record('END %s' % e.exit_status, subdir, testname)
546            raise
547        except error.JobError, e:
548            self._decrement_group_level()
549            self.record('END ABORT', subdir, testname)
550            raise
551        except Exception, e:
552            # This should only ever happen due to a bug in the given
553            # function's code.  The common case of being called by
554            # run_test() will never reach this.  If a control file called
555            # run_group() itself, bugs in its function will be caught
556            # here.
557            self._decrement_group_level()
558            err_msg = str(e) + '\n' + traceback.format_exc()
559            self.record('END ERROR', subdir, testname, err_msg)
560            raise
561
562
563    def run_group(self, function, tag=None, **dargs):
564        """
565        Run a function nested within a group level.
566
567        function:
568                Callable to run.
569        tag:
570                An optional tag name for the group.  If None (default)
571                function.__name__ will be used.
572        **dargs:
573                Named arguments for the function.
574        """
575        if tag:
576            name = tag
577        else:
578            name = function.__name__
579
580        try:
581            return self._rungroup(subdir=None, testname=name,
582                                  function=function, **dargs)
583        except (SystemExit, error.TestBaseException):
584            raise
585        # If there was a different exception, turn it into a TestError.
586        # It will be caught by step_engine or _run_step_fn.
587        except Exception, e:
588            raise error.UnhandledTestError(e)
589
590
591    def cpu_count(self):
592        return utils.count_cpus()  # use total system count
593
594
595    def start_reboot(self):
596        self.record('START', None, 'reboot')
597        self._increment_group_level()
598        self.record('GOOD', None, 'reboot.start')
599
600
601    def _record_reboot_failure(self, subdir, operation, status,
602                               running_id=None):
603        self.record("ABORT", subdir, operation, status)
604        self._decrement_group_level()
605        if not running_id:
606            running_id = utils.running_os_ident()
607        kernel = {"kernel": running_id.split("::")[0]}
608        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
609
610
611    def _check_post_reboot(self, subdir, running_id=None):
612        """
613        Function to perform post boot checks such as if the mounted
614        partition list has changed across reboots.
615        """
616        partition_list = partition_lib.get_partition_list(self,
617                                                          exclude_swap=False)
618        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
619        old_mount_info = self.get_state("__mount_info")
620        if mount_info != old_mount_info:
621            new_entries = mount_info - old_mount_info
622            old_entries = old_mount_info - mount_info
623            description = ("mounted partitions are different after reboot "
624                           "(old entries: %s, new entries: %s)" %
625                           (old_entries, new_entries))
626            self._record_reboot_failure(subdir, "reboot.verify_config",
627                                        description, running_id=running_id)
628            raise error.JobError("Reboot failed: %s" % description)
629
630
631    def end_reboot(self, subdir, kernel, patches, running_id=None):
632        self._check_post_reboot(subdir, running_id=running_id)
633
634        # strip ::<timestamp> from the kernel version if present
635        kernel = kernel.split("::")[0]
636        kernel_info = {"kernel": kernel}
637        for i, patch in enumerate(patches):
638            kernel_info["patch%d" % i] = patch
639        self._decrement_group_level()
640        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
641
642
643    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
644                              type='src', patches=[]):
645        """ Check the passed kernel identifier against the command line
646            and the running kernel, abort the job on missmatch. """
647
648        logging.info("POST BOOT: checking booted kernel "
649                     "mark=%d identity='%s' type='%s'",
650                     expected_when, expected_id, type)
651
652        running_id = utils.running_os_ident()
653
654        cmdline = utils.read_one_line("/proc/cmdline")
655
656        find_sum = re.compile(r'.*IDENT=(\d+)')
657        m = find_sum.match(cmdline)
658        cmdline_when = -1
659        if m:
660            cmdline_when = int(m.groups()[0])
661
662        # We have all the facts, see if they indicate we
663        # booted the requested kernel or not.
664        bad = False
665        if (type == 'src' and expected_id != running_id or
666            type == 'rpm' and
667            not running_id.startswith(expected_id + '::')):
668            logging.error("Kernel identifier mismatch")
669            bad = True
670        if expected_when != cmdline_when:
671            logging.error("Kernel command line mismatch")
672            bad = True
673
674        if bad:
675            logging.error("   Expected Ident: " + expected_id)
676            logging.error("    Running Ident: " + running_id)
677            logging.error("    Expected Mark: %d", expected_when)
678            logging.error("Command Line Mark: %d", cmdline_when)
679            logging.error("     Command Line: " + cmdline)
680
681            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",
682                                        running_id=running_id)
683            raise error.JobError("Reboot returned with the wrong kernel")
684
685        self.record('GOOD', subdir, 'reboot.verify',
686                    utils.running_os_full_version())
687        self.end_reboot(subdir, expected_id, patches, running_id=running_id)
688
689
690    def partition(self, device, loop_size=0, mountpoint=None):
691        """
692        Work with a machine partition
693
694            @param device: e.g. /dev/sda2, /dev/sdb1 etc...
695            @param mountpoint: Specify a directory to mount to. If not specified
696                               autotest tmp directory will be used.
697            @param loop_size: Size of loopback device (in MB). Defaults to 0.
698
699            @return: A L{client.bin.partition.partition} object
700        """
701
702        if not mountpoint:
703            mountpoint = self.tmpdir
704        return partition_lib.partition(self, device, loop_size, mountpoint)
705
706    @utils.deprecated
707    def filesystem(self, device, mountpoint=None, loop_size=0):
708        """ Same as partition
709
710        @deprecated: Use partition method instead
711        """
712        return self.partition(device, loop_size, mountpoint)
713
714
715    def enable_external_logging(self):
716        pass
717
718
719    def disable_external_logging(self):
720        pass
721
722
723    def default_tag(self, tag):
724        """Allows the scheduler's job tag to be passed in from autoserv."""
725        if not self._is_continuation:
726            self.set_state("__job_tag", tag)
727            self.tag = tag
728
729
730    def reboot_setup(self):
731        # save the partition list and their mount point and compare it
732        # after reboot
733        partition_list = partition_lib.get_partition_list(self,
734                                                          exclude_swap=False)
735        mount_info = set((p.device, p.get_mountpoint()) for p in partition_list)
736        self.set_state("__mount_info", mount_info)
737
738
739    def reboot(self, tag=LAST_BOOT_TAG):
740        if tag == LAST_BOOT_TAG:
741            tag = self.last_boot_tag
742        else:
743            self.last_boot_tag = tag
744
745        self.reboot_setup()
746        self.harness.run_reboot()
747        default = self.config_get('boot.set_default')
748        if default:
749            self.bootloader.set_default(tag)
750        else:
751            self.bootloader.boot_once(tag)
752
753        # HACK: using this as a module sometimes hangs shutdown, so if it's
754        # installed unload it first
755        utils.system("modprobe -r netconsole", ignore_status=True)
756
757        # sync first, so that a sync during shutdown doesn't time out
758        utils.system("sync; sync", ignore_status=True)
759
760        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
761        self.quit()
762
763
764    def noop(self, text):
765        logging.info("job: noop: " + text)
766
767
768    @_run_test_complete_on_exit
769    def parallel(self, *tasklist):
770        """Run tasks in parallel"""
771
772        pids = []
773        old_log_filename = self._log_filename
774        for i, task in enumerate(tasklist):
775            assert isinstance(task, (tuple, list))
776            self._log_filename = old_log_filename + (".%d" % i)
777            task_func = lambda: task[0](*task[1:])
778            pids.append(parallel.fork_start(self.resultdir, task_func))
779
780        old_log_path = os.path.join(self.resultdir, old_log_filename)
781        old_log = open(old_log_path, "a")
782        exceptions = []
783        for i, pid in enumerate(pids):
784            # wait for the task to finish
785            try:
786                parallel.fork_waitfor(self.resultdir, pid)
787            except Exception, e:
788                exceptions.append(e)
789            # copy the logs from the subtask into the main log
790            new_log_path = old_log_path + (".%d" % i)
791            if os.path.exists(new_log_path):
792                new_log = open(new_log_path)
793                old_log.write(new_log.read())
794                new_log.close()
795                old_log.flush()
796                os.remove(new_log_path)
797        old_log.close()
798
799        self._log_filename = old_log_filename
800
801        # handle any exceptions raised by the parallel tasks
802        if exceptions:
803            msg = "%d task(s) failed in job.parallel" % len(exceptions)
804            raise error.JobError(msg)
805
806
807    def quit(self):
808        # XXX: should have a better name.
809        self.harness.run_pause()
810        raise error.JobContinue("more to come")
811
812
813    def complete(self, status):
814        """Clean up and exit"""
815        # We are about to exit 'complete' so clean up the control file.
816        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))
817        shutil.move(self._state_file, dest)
818
819        self.harness.run_complete()
820        self.disable_external_logging()
821        sys.exit(status)
822
823
824    def _load_state(self):
825        # grab any initial state and set up $CONTROL.state as the backing file
826        init_state_file = self.control + '.init.state'
827        self._state_file = self.control + '.state'
828        if os.path.exists(init_state_file):
829            shutil.move(init_state_file, self._state_file)
830        self._state.set_backing_file(self._state_file)
831
832        # initialize the state engine, if necessary
833        has_steps = self._state.has('client', 'steps')
834        if not self._is_continuation and has_steps:
835            raise RuntimeError('Loaded state can only contain client.steps if '
836                               'this is a continuation')
837
838        if not has_steps:
839            logging.info('Initializing the state engine')
840            self._state.set('client', 'steps', [])
841
842
843    def __create_step_tuple(self, fn, args, dargs):
844        # Legacy code passes in an array where the first arg is
845        # the function or its name.
846        if isinstance(fn, list):
847            assert(len(args) == 0)
848            assert(len(dargs) == 0)
849            args = fn[1:]
850            fn = fn[0]
851        # Pickling actual functions is hairy, thus we have to call
852        # them by name.  Unfortunately, this means only functions
853        # defined globally can be used as a next step.
854        if callable(fn):
855            fn = fn.__name__
856        if not isinstance(fn, types.StringTypes):
857            raise StepError("Next steps must be functions or "
858                            "strings containing the function name")
859        ancestry = copy.copy(self._current_step_ancestry)
860        return (ancestry, fn, args, dargs)
861
862
863    def next_step_append(self, fn, *args, **dargs):
864        """Define the next step and place it at the end"""
865        steps = self._state.get('client', 'steps')
866        steps.append(self.__create_step_tuple(fn, args, dargs))
867        self._state.set('client', 'steps', steps)
868
869
870    def next_step(self, fn, *args, **dargs):
871        """Create a new step and place it after any steps added
872        while running the current step but before any steps added in
873        previous steps"""
874        steps = self._state.get('client', 'steps')
875        steps.insert(self._next_step_index,
876                     self.__create_step_tuple(fn, args, dargs))
877        self._next_step_index += 1
878        self._state.set('client', 'steps', steps)
879
880
881    def next_step_prepend(self, fn, *args, **dargs):
882        """Insert a new step, executing first"""
883        steps = self._state.get('client', 'steps')
884        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
885        self._next_step_index += 1
886        self._state.set('client', 'steps', steps)
887
888
889
890    def _run_step_fn(self, local_vars, fn, args, dargs):
891        """Run a (step) function within the given context"""
892
893        local_vars['__args'] = args
894        local_vars['__dargs'] = dargs
895        try:
896            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
897            return local_vars['__ret']
898        except SystemExit:
899            raise  # Send error.JobContinue and JobComplete on up to runjob.
900        except error.TestNAError, detail:
901            self.record(detail.exit_status, None, fn, str(detail))
902        except Exception, detail:
903            raise error.UnhandledJobError(detail)
904
905
906    def _create_frame(self, global_vars, ancestry, fn_name):
907        """Set up the environment like it would have been when this
908        function was first defined.
909
910        Child step engine 'implementations' must have 'return locals()'
911        at end end of their steps.  Because of this, we can call the
912        parent function and get back all child functions (i.e. those
913        defined within it).
914
915        Unfortunately, the call stack of the function calling
916        job.next_step might have been deeper than the function it
917        added.  In order to make sure that the environment is what it
918        should be, we need to then pop off the frames we built until
919        we find the frame where the function was first defined."""
920
921        # The copies ensure that the parent frames are not modified
922        # while building child frames.  This matters if we then
923        # pop some frames in the next part of this function.
924        current_frame = copy.copy(global_vars)
925        frames = [current_frame]
926        for steps_fn_name in ancestry:
927            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
928            current_frame = copy.copy(ret)
929            frames.append(current_frame)
930
931        # Walk up the stack frames until we find the place fn_name was defined.
932        while len(frames) > 2:
933            if fn_name not in frames[-2]:
934                break
935            if frames[-2][fn_name] != frames[-1][fn_name]:
936                break
937            frames.pop()
938            ancestry.pop()
939
940        return (frames[-1], ancestry)
941
942
943    def _add_step_init(self, local_vars, current_function):
944        """If the function returned a dictionary that includes a
945        function named 'step_init', prepend it to our list of steps.
946        This will only get run the first time a function with a nested
947        use of the step engine is run."""
948
949        if (isinstance(local_vars, dict) and
950            'step_init' in local_vars and
951            callable(local_vars['step_init'])):
952            # The init step is a child of the function
953            # we were just running.
954            self._current_step_ancestry.append(current_function)
955            self.next_step_prepend('step_init')
956
957
958    def step_engine(self):
959        """The multi-run engine used when the control file defines step_init.
960
961        Does the next step.
962        """
963
964        # Set up the environment and then interpret the control file.
965        # Some control files will have code outside of functions,
966        # which means we need to have our state engine initialized
967        # before reading in the file.
968        global_control_vars = {'job': self}
969        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
970        try:
971            execfile(self.control, global_control_vars, global_control_vars)
972        except error.TestNAError, detail:
973            self.record(detail.exit_status, None, self.control, str(detail))
974        except SystemExit:
975            raise  # Send error.JobContinue and JobComplete on up to runjob.
976        except Exception, detail:
977            # Syntax errors or other general Python exceptions coming out of
978            # the top level of the control file itself go through here.
979            raise error.UnhandledJobError(detail)
980
981        # If we loaded in a mid-job state file, then we presumably
982        # know what steps we have yet to run.
983        if not self._is_continuation:
984            if 'step_init' in global_control_vars:
985                self.next_step(global_control_vars['step_init'])
986
987        # Iterate through the steps.  If we reboot, we'll simply
988        # continue iterating on the next step.
989        while len(self._state.get('client', 'steps')) > 0:
990            steps = self._state.get('client', 'steps')
991            (ancestry, fn_name, args, dargs) = steps.pop(0)
992            self._state.set('client', 'steps', steps)
993
994            self._next_step_index = 0
995            ret = self._create_frame(global_control_vars, ancestry, fn_name)
996            local_vars, self._current_step_ancestry = ret
997            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
998            self._add_step_init(local_vars, fn_name)
999
1000
1001    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1002        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1003                                   on_every_test)
1004
1005
1006    def add_sysinfo_logfile(self, file, on_every_test=False):
1007        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1008
1009
1010    def _add_sysinfo_loggable(self, loggable, on_every_test):
1011        if on_every_test:
1012            self.sysinfo.test_loggables.add(loggable)
1013        else:
1014            self.sysinfo.boot_loggables.add(loggable)
1015        self._save_sysinfo_state()
1016
1017
1018    def _load_sysinfo_state(self):
1019        state = self._state.get('client', 'sysinfo', None)
1020        if state:
1021            self.sysinfo.deserialize(state)
1022
1023
1024    def _save_sysinfo_state(self):
1025        state = self.sysinfo.serialize()
1026        self._state.set('client', 'sysinfo', state)
1027
1028
1029    def _init_group_level(self):
1030        self._group_level = self.get_state("__group_level", default=0)
1031
1032
1033    def _increment_group_level(self):
1034        self._group_level += 1
1035        self.set_state("__group_level", self._group_level)
1036
1037
1038    def _decrement_group_level(self):
1039        self._group_level -= 1
1040        self.set_state("__group_level", self._group_level)
1041
1042
1043    def record(self, status_code, subdir, operation, status = '',
1044               optional_fields=None):
1045        """
1046        Record job-level status
1047
1048        The intent is to make this file both machine parseable and
1049        human readable. That involves a little more complexity, but
1050        really isn't all that bad ;-)
1051
1052        Format is <status code>\t<subdir>\t<operation>\t<status>
1053
1054        status code: (GOOD|WARN|FAIL|ABORT)
1055                or   START
1056                or   END (GOOD|WARN|FAIL|ABORT)
1057
1058        subdir: MUST be a relevant subdirectory in the results,
1059        or None, which will be represented as '----'
1060
1061        operation: description of what you ran (e.g. "dbench", or
1062                                        "mkfs -t foobar /dev/sda9")
1063
1064        status: error message or "completed sucessfully"
1065
1066        ------------------------------------------------------------
1067
1068        Initial tabs indicate indent levels for grouping, and is
1069        governed by self.group_level
1070
1071        multiline messages have secondary lines prefaced by a double
1072        space ('  ')
1073        """
1074
1075        if subdir:
1076            if re.match(r'[\n\t]', subdir):
1077                raise ValueError("Invalid character in subdir string")
1078            substr = subdir
1079        else:
1080            substr = '----'
1081
1082        if not log.is_valid_status(status_code):
1083            raise ValueError("Invalid status code supplied: %s" % status_code)
1084        if not operation:
1085            operation = '----'
1086
1087        if re.match(r'[\n\t]', operation):
1088            raise ValueError("Invalid character in operation string")
1089        operation = operation.rstrip()
1090
1091        if not optional_fields:
1092            optional_fields = {}
1093
1094        status = status.rstrip()
1095        status = re.sub(r"\t", "  ", status)
1096        # Ensure any continuation lines are marked so we can
1097        # detect them in the status file to ensure it is parsable.
1098        status = re.sub(r"\n", "\n" + "\t" * self._group_level + "  ", status)
1099
1100        # Generate timestamps for inclusion in the logs
1101        epoch_time = int(time.time())  # seconds since epoch, in UTC
1102        local_time = time.localtime(epoch_time)
1103        optional_fields["timestamp"] = str(epoch_time)
1104        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1105                                                     local_time)
1106
1107        fields = [status_code, substr, operation]
1108        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1109        fields.append(status)
1110
1111        msg = '\t'.join(str(x) for x in fields)
1112        msg = '\t' * self._group_level + msg
1113
1114        msg_tag = ""
1115        if "." in self._log_filename:
1116            msg_tag = self._log_filename.split(".", 1)[1]
1117
1118        self.harness.test_status_detail(status_code, substr, operation, status,
1119                                        msg_tag)
1120        self.harness.test_status(msg, msg_tag)
1121
1122        # log to stdout (if enabled)
1123        logging.info(msg)
1124
1125        # log to the "root" status log
1126        status_file = os.path.join(self.resultdir, self._log_filename)
1127        open(status_file, "a").write(msg + "\n")
1128
1129        # log to the subdir status log (if subdir is set)
1130        if subdir:
1131            dir = os.path.join(self.resultdir, subdir)
1132            status_file = os.path.join(dir, self._DEFAULT_LOG_FILENAME)
1133            open(status_file, "a").write(msg + "\n")
1134
1135
1136class disk_usage_monitor:
1137    def __init__(self, logging_func, device, max_mb_per_hour):
1138        self.func = logging_func
1139        self.device = device
1140        self.max_mb_per_hour = max_mb_per_hour
1141
1142
1143    def start(self):
1144        self.initial_space = utils.freespace(self.device)
1145        self.start_time = time.time()
1146
1147
1148    def stop(self):
1149        # if no maximum usage rate was set, we don't need to
1150        # generate any warnings
1151        if not self.max_mb_per_hour:
1152            return
1153
1154        final_space = utils.freespace(self.device)
1155        used_space = self.initial_space - final_space
1156        stop_time = time.time()
1157        total_time = stop_time - self.start_time
1158        # round up the time to one minute, to keep extremely short
1159        # tests from generating false positives due to short, badly
1160        # timed bursts of activity
1161        total_time = max(total_time, 60.0)
1162
1163        # determine the usage rate
1164        bytes_per_sec = used_space / total_time
1165        mb_per_sec = bytes_per_sec / 1024**2
1166        mb_per_hour = mb_per_sec * 60 * 60
1167
1168        if mb_per_hour > self.max_mb_per_hour:
1169            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1170            msg %= (self.device, mb_per_hour)
1171            self.func(msg)
1172
1173
1174    @classmethod
1175    def watch(cls, *monitor_args, **monitor_dargs):
1176        """ Generic decorator to wrap a function call with the
1177        standard create-monitor -> start -> call -> stop idiom."""
1178        def decorator(func):
1179            def watched_func(*args, **dargs):
1180                monitor = cls(*monitor_args, **monitor_dargs)
1181                monitor.start()
1182                try:
1183                    func(*args, **dargs)
1184                finally:
1185                    monitor.stop()
1186            return watched_func
1187        return decorator
1188
1189
1190def runjob(control, drop_caches, options):
1191    """
1192    Run a job using the given control file.
1193
1194    This is the main interface to this module.
1195
1196    @see base_job.__init__ for parameter info.
1197    """
1198    control = os.path.abspath(control)
1199    state = control + '.state'
1200    # Ensure state file is cleaned up before the job starts to run if autotest
1201    # is not running with the --continue flag
1202    if not options.cont and os.path.isfile(state):
1203        logging.debug('Cleaning up previously found state file')
1204        os.remove(state)
1205
1206    # instantiate the job object ready for the control file.
1207    myjob = None
1208    try:
1209        # Check that the control file is valid
1210        if not os.path.exists(control):
1211            raise error.JobError(control + ": control file not found")
1212
1213        # When continuing, the job is complete when there is no
1214        # state file, ensure we don't try and continue.
1215        if options.cont and not os.path.exists(state):
1216            raise error.JobComplete("all done")
1217
1218        myjob = job(control=control, drop_caches=drop_caches, options=options)
1219
1220        # Load in the users control file, may do any one of:
1221        #  1) execute in toto
1222        #  2) define steps, and select the first via next_step()
1223        myjob.step_engine()
1224
1225    except error.JobContinue:
1226        sys.exit(5)
1227
1228    except error.JobComplete:
1229        sys.exit(1)
1230
1231    except error.JobError, instance:
1232        logging.error("JOB ERROR: " + instance.args[0])
1233        if myjob:
1234            command = None
1235            if len(instance.args) > 1:
1236                command = instance.args[1]
1237                myjob.record('ABORT', None, command, instance.args[0])
1238            myjob._decrement_group_level()
1239            myjob.record('END ABORT', None, None, instance.args[0])
1240            assert (myjob._group_level == 0), ('myjob._group_level must be 0,'
1241                                               ' not %d' % myjob._group_level)
1242            myjob.complete(1)
1243        else:
1244            sys.exit(1)
1245
1246    except Exception, e:
1247        # NOTE: job._run_step_fn and job.step_engine will turn things into
1248        # a JobError for us.  If we get here, its likely an autotest bug.
1249        msg = str(e) + '\n' + traceback.format_exc()
1250        logging.critical("JOB ERROR (autotest bug?): " + msg)
1251        if myjob:
1252            myjob._decrement_group_level()
1253            myjob.record('END ABORT', None, None, msg)
1254            assert(myjob._group_level == 0)
1255            myjob.complete(1)
1256        else:
1257            sys.exit(1)
1258
1259    # If we get here, then we assume the job is complete and good.
1260    myjob._decrement_group_level()
1261    myjob.record('END GOOD', None, None)
1262    assert(myjob._group_level == 0)
1263
1264    myjob.complete(0)
1265
1266
1267site_job = utils.import_site_class(
1268    __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job)
1269
1270class job(site_job):
1271    pass
1272