job.py revision 7afc3a678bdf39549f6397a9b8d5d181a628e992
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types
9import cPickle as pickle
10from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen
11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness
12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition
13from autotest_lib.client.common_lib import error, barrier, log, utils
14from autotest_lib.client.common_lib import packages, debug
15
16LAST_BOOT_TAG = object()
17NO_DEFAULT = object()
18JOB_PREAMBLE = """
19from autotest_lib.client.common_lib.error import *
20from autotest_lib.client.common_lib.utils import *
21from autotest_lib.client.bin.autotest_utils import *
22"""
23
24
25class StepError(error.AutotestError):
26    pass
27
28class NotAvailableError(error.AutotestError):
29    pass
30
31
32
33def _run_test_complete_on_exit(f):
34    """Decorator for job methods that automatically calls
35    self.harness.run_test_complete when the method exits, if appropriate."""
36    def wrapped(self, *args, **dargs):
37        try:
38            return f(self, *args, **dargs)
39        finally:
40            if self.log_filename == self.DEFAULT_LOG_FILENAME:
41                self.harness.run_test_complete()
42                if self.drop_caches:
43                    print "Dropping caches"
44                    autotest_utils.drop_caches()
45    wrapped.__name__ = f.__name__
46    wrapped.__doc__ = f.__doc__
47    wrapped.__dict__.update(f.__dict__)
48    return wrapped
49
50
51class base_job(object):
52    """The actual job against which we do everything.
53
54    Properties:
55            autodir
56                    The top level autotest directory (/usr/local/autotest).
57                    Comes from os.environ['AUTODIR'].
58            bindir
59                    <autodir>/bin/
60            libdir
61                    <autodir>/lib/
62            testdir
63                    <autodir>/tests/
64            configdir
65                    <autodir>/config/
66            site_testdir
67                    <autodir>/site_tests/
68            profdir
69                    <autodir>/profilers/
70            tmpdir
71                    <autodir>/tmp/
72            pkgdir
73                    <autodir>/packages/
74            resultdir
75                    <autodir>/results/<jobtag>
76            toolsdir
77                    <autodir>/tools/
78            stdout
79                    fd_stack object for stdout
80            stderr
81                    fd_stack object for stderr
82            profilers
83                    the profilers object for this job
84            harness
85                    the server harness object for this job
86            config
87                    the job configuration for this job
88    """
89
90    DEFAULT_LOG_FILENAME = "status"
91
92    def __init__(self, control, jobtag, cont, harness_type=None,
93                    use_external_logging = False, drop_caches=True):
94        """
95                control
96                        The control file (pathname of)
97                jobtag
98                        The job tag string (eg "default")
99                cont
100                        If this is the continuation of this job
101                harness_type
102                        An alternative server harness
103        """
104        self.drop_caches = drop_caches
105        if self.drop_caches:
106            print "Dropping caches"
107            autotest_utils.drop_caches()
108
109        self.autodir = os.environ['AUTODIR']
110        self.bindir = os.path.join(self.autodir, 'bin')
111        self.libdir = os.path.join(self.autodir, 'lib')
112        self.testdir = os.path.join(self.autodir, 'tests')
113        self.configdir = os.path.join(self.autodir, 'config')
114        self.site_testdir = os.path.join(self.autodir, 'site_tests')
115        self.profdir = os.path.join(self.autodir, 'profilers')
116        self.tmpdir = os.path.join(self.autodir, 'tmp')
117        self.toolsdir = os.path.join(self.autodir, 'tools')
118        self.resultdir = os.path.join(self.autodir, 'results', jobtag)
119
120        self.control = os.path.realpath(control)
121        self._is_continuation = cont
122        self.state_file = self.control + '.state'
123        self.current_step_ancestry = []
124        self.next_step_index = 0
125        self.testtag = ''
126        self._test_tag_prefix = ''
127
128        self._load_state()
129        self.pkgmgr = packages.PackageManager(
130            self.autodir, run_function_dargs={'timeout':1800})
131        self.pkgdir = os.path.join(self.autodir, 'packages')
132        self.run_test_cleanup = self.get_state("__run_test_cleanup",
133                                                default=True)
134
135        self.sysinfo = sysinfo.sysinfo(self.resultdir)
136        self._load_sysinfo_state()
137
138        self.last_boot_tag = self.get_state("__last_boot_tag", default=None)
139        self.job_log = debug.get_logger(module='client')
140
141        if not cont:
142            """
143            Don't cleanup the tmp dir (which contains the lockfile)
144            in the constructor, this would be a problem for multiple
145            jobs starting at the same time on the same client. Instead
146            do the delete at the server side. We simply create the tmp
147            directory here if it does not already exist.
148            """
149            if not os.path.exists(self.tmpdir):
150                os.mkdir(self.tmpdir)
151
152            if not os.path.exists(self.pkgdir):
153                os.mkdir(self.pkgdir)
154
155            results = os.path.join(self.autodir, 'results')
156            if not os.path.exists(results):
157                os.mkdir(results)
158
159            download = os.path.join(self.testdir, 'download')
160            if not os.path.exists(download):
161                os.mkdir(download)
162
163            if os.path.exists(self.resultdir):
164                utils.system('rm -rf ' + self.resultdir)
165            os.mkdir(self.resultdir)
166
167            os.mkdir(os.path.join(self.resultdir, 'debug'))
168            os.mkdir(os.path.join(self.resultdir, 'analysis'))
169
170            shutil.copyfile(self.control,
171                            os.path.join(self.resultdir, 'control'))
172
173
174        self.control = control
175        self.jobtag = jobtag
176        self.log_filename = self.DEFAULT_LOG_FILENAME
177        self.container = None
178
179        self.stdout = fd_stack.fd_stack(1, sys.stdout)
180        self.stderr = fd_stack.fd_stack(2, sys.stderr)
181
182        self._init_group_level()
183
184        self.config = config.config(self)
185        self.harness = harness.select(harness_type, self)
186        self.profilers = profilers.profilers(self)
187
188        try:
189            tool = self.config_get('boottool.executable')
190            self.bootloader = boottool.boottool(tool)
191        except:
192            pass
193
194        self.sysinfo.log_per_reboot_data()
195
196        if not cont:
197            self.record('START', None, None)
198            self._increment_group_level()
199
200        self.harness.run_start()
201
202        if use_external_logging:
203            self.enable_external_logging()
204
205        # load the max disk usage rate - default to no monitoring
206        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
207
208
209    def monitor_disk_usage(self, max_rate):
210        """\
211        Signal that the job should monitor disk space usage on /
212        and generate a warning if a test uses up disk space at a
213        rate exceeding 'max_rate'.
214
215        Parameters:
216             max_rate - the maximium allowed rate of disk consumption
217                        during a test, in MB/hour, or 0 to indicate
218                        no limit.
219        """
220        self.set_state('__monitor_disk', max_rate)
221        self.max_disk_usage_rate = max_rate
222
223
224    def relative_path(self, path):
225        """\
226        Return a patch relative to the job results directory
227        """
228        head = len(self.resultdir) + 1     # remove the / inbetween
229        return path[head:]
230
231
232    def control_get(self):
233        return self.control
234
235
236    def control_set(self, control):
237        self.control = os.path.abspath(control)
238
239
240    def harness_select(self, which):
241        self.harness = harness.select(which, self)
242
243
244    def config_set(self, name, value):
245        self.config.set(name, value)
246
247
248    def config_get(self, name):
249        return self.config.get(name)
250
251
252    def setup_dirs(self, results_dir, tmp_dir):
253        if not tmp_dir:
254            tmp_dir = os.path.join(self.tmpdir, 'build')
255        if not os.path.exists(tmp_dir):
256            os.mkdir(tmp_dir)
257        if not os.path.isdir(tmp_dir):
258            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
259            raise ValueError(e_msg)
260
261        # We label the first build "build" and then subsequent ones
262        # as "build.2", "build.3", etc. Whilst this is a little bit
263        # inconsistent, 99.9% of jobs will only have one build
264        # (that's not done as kernbench, sparse, or buildtest),
265        # so it works out much cleaner. One of life's comprimises.
266        if not results_dir:
267            results_dir = os.path.join(self.resultdir, 'build')
268            i = 2
269            while os.path.exists(results_dir):
270                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
271                i += 1
272        if not os.path.exists(results_dir):
273            os.mkdir(results_dir)
274
275        return (results_dir, tmp_dir)
276
277
278    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
279                            kjob = None ):
280        """Summon a xen object"""
281        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
282        build_dir = 'xen'
283        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
284                       leave, kjob)
285
286
287    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
288        """Summon a kernel object"""
289        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
290        build_dir = 'linux'
291        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
292                                  build_dir, leave)
293
294
295    def barrier(self, *args, **kwds):
296        """Create a barrier object"""
297        return barrier.barrier(*args, **kwds)
298
299
300    def install_pkg(self, name, pkg_type, install_dir):
301        '''
302        This method is a simple wrapper around the actual package
303        installation method in the Packager class. This is used
304        internally by the profilers, deps and tests code.
305        name : name of the package (ex: sleeptest, dbench etc.)
306        pkg_type : Type of the package (ex: test, dep etc.)
307        install_dir : The directory in which the source is actually
308                      untarred into. (ex: client/profilers/<name> for profilers)
309        '''
310        if len(self.pkgmgr.repo_urls) > 0:
311            self.pkgmgr.install_pkg(name, pkg_type,
312                                    self.pkgdir, install_dir)
313
314
315    def add_repository(self, repo_urls):
316        '''
317        Adds the repository locations to the job so that packages
318        can be fetched from them when needed. The repository list
319        needs to be a string list
320        Ex: job.add_repository(['http://blah1','http://blah2'])
321        '''
322        # TODO(aganti): Validate the list of the repository URLs.
323        repositories = repo_urls + self.pkgmgr.repo_urls
324        self.pkgmgr = packages.PackageManager(
325            self.autodir, repo_urls=repositories,
326            run_function_dargs={'timeout':1800})
327        # Fetch the packages' checksum file that contains the checksums
328        # of all the packages if it is not already fetched. The checksum
329        # is always fetched whenever a job is first started. This
330        # is not done in the job's constructor as we don't have the list of
331        # the repositories there (and obviously don't care about this file
332        # if we are not using the repos)
333        try:
334            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
335                                              packages.CHECKSUM_FILE)
336            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path,
337                                  use_checksum=False)
338        except packages.PackageFetchError, e:
339            # packaging system might not be working in this case
340            # Silently fall back to the normal case
341            pass
342
343
344    def require_gcc(self):
345        """
346        Test whether gcc is installed on the machine.
347        """
348        # check if gcc is installed on the system.
349        try:
350            utils.system('which gcc')
351        except error.CmdError, e:
352            raise NotAvailableError('gcc is required by this job and is '
353                                    'not available on the system')
354
355
356    def setup_dep(self, deps):
357        """Set up the dependencies for this test.
358        deps is a list of libraries required for this test.
359        """
360        # Fetch the deps from the repositories and set them up.
361        for dep in deps:
362            dep_dir = os.path.join(self.autodir, 'deps', dep)
363            # Search for the dependency in the repositories if specified,
364            # else check locally.
365            try:
366                self.install_pkg(dep, 'dep', dep_dir)
367            except packages.PackageInstallError:
368                # see if the dep is there locally
369                pass
370
371            # dep_dir might not exist if it is not fetched from the repos
372            if not os.path.exists(dep_dir):
373                raise error.TestError("Dependency %s does not exist" % dep)
374
375            os.chdir(dep_dir)
376            utils.system('./' + dep + '.py')
377
378
379    def _runtest(self, url, tag, args, dargs):
380        try:
381            try:
382                l = lambda : test.runtest(self, url, tag, args, dargs)
383                pid = parallel.fork_start(self.resultdir, l)
384                parallel.fork_waitfor(self.resultdir, pid)
385            except error.TestBaseException:
386                raise
387            except Exception, e:
388                raise error.UnhandledTestError(e)
389        finally:
390            # Reset the logging level to client level
391            debug.configure(module='client')
392
393
394    @_run_test_complete_on_exit
395    def run_test(self, url, *args, **dargs):
396        """Summon a test object and run it.
397
398        tag
399                tag to add to testname
400        url
401                url of the test to run
402        """
403
404        if not url:
405            raise TypeError("Test name is invalid. "
406                            "Switched arguments?")
407        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
408        namelen = len(testname)
409        dargs = dargs.copy()
410        tntag = dargs.pop('tag', None)
411        if tntag:         # per-test tag  is included in reported test name
412            testname += '.' + str(tntag)
413        run_number = self.get_run_number()
414        if run_number:
415            testname += '._%02d_' % run_number
416            self.set_run_number(run_number + 1)
417        if self._is_kernel_in_test_tag():
418            testname += '.' + platform.release()
419        if self._test_tag_prefix:
420            testname += '.' + self._test_tag_prefix
421        if self.testtag:  # job-level tag is included in reported test name
422            testname += '.' + self.testtag
423        subdir = testname
424        sdtag = dargs.pop('subdir_tag', None)
425        if sdtag:  # subdir-only tag is not included in reports
426            subdir = subdir + '.' + str(sdtag)
427        tag = subdir[namelen+1:]    # '' if none
428
429        outputdir = os.path.join(self.resultdir, subdir)
430        if os.path.exists(outputdir):
431            msg = ("%s already exists, test <%s> may have"
432                    " already run with tag <%s>"
433                    % (outputdir, testname, tag) )
434            raise error.TestError(msg)
435        # NOTE: client/common_lib/test.py runtest() depends directory names
436        # being constructed the same way as in this code.
437        os.mkdir(outputdir)
438
439        container = dargs.pop('container', None)
440        if container:
441            cname = container.get('name', None)
442            if not cname:   # get old name
443                cname = container.get('container_name', None)
444            mbytes = container.get('mbytes', None)
445            if not mbytes:  # get old name
446                mbytes = container.get('mem', None)
447            cpus  = container.get('cpus', None)
448            if not cpus:    # get old name
449                cpus  = container.get('cpu', None)
450            root  = container.get('root', None)
451            network = container.get('network', None)
452            disk = container.get('disk', None)
453            try:
454                self.new_container(mbytes=mbytes, cpus=cpus, root=root,
455                                   name=cname, network=network, disk=disk)
456            except cpuset.CpusetsNotAvailable, e:
457                self.record("START", subdir, testname)
458                self._increment_group_level()
459                self.record("ERROR", subdir, testname, str(e))
460                self._decrement_group_level()
461                self.record("END ERROR", subdir, testname)
462                return False
463            # We are running in a container now...
464
465        def log_warning(reason):
466            self.record("WARN", subdir, testname, reason)
467        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
468        def group_func():
469            try:
470                self._runtest(url, tag, args, dargs)
471            except error.TestBaseException, detail:
472                self.record(detail.exit_status, subdir, testname, str(detail))
473                raise
474            except Exception, detail:
475                self.record('FAIL', subdir, testname, str(detail))
476                raise
477            else:
478                self.record('GOOD', subdir, testname, 'completed successfully')
479
480        try:
481            self._rungroup(subdir, testname, group_func)
482            if container:
483                self.release_container()
484            return True
485        except error.TestBaseException, e:
486            return False
487        # Any other exception here will be given to the caller
488
489
490    def _rungroup(self, subdir, testname, function, *args, **dargs):
491        """\
492        subdir:
493                name of the group
494        testname:
495                name of the test to run, or support step
496        function:
497                subroutine to run
498        *args:
499                arguments for the function
500
501        Returns the result of the passed in function
502        """
503
504        try:
505            self.record('START', subdir, testname)
506            self._increment_group_level()
507            result = function(*args, **dargs)
508            self._decrement_group_level()
509            self.record('END GOOD', subdir, testname)
510            return result
511        except error.TestBaseException, e:
512            self._decrement_group_level()
513            self.record('END %s' % e.exit_status, subdir, testname, str(e))
514            raise
515        except Exception, e:
516            self._decrement_group_level()
517            err_msg = str(e) + '\n' + traceback.format_exc()
518            self.record('END FAIL', subdir, testname, err_msg)
519            raise
520
521
522    def run_group(self, function, **dargs):
523        """\
524        function:
525                subroutine to run
526        **dargs:
527                arguments for the function
528        """
529
530        # Allow the tag for the group to be specified
531        tag = dargs.pop('tag', None)
532        if tag:
533            name = tag
534        else:
535            name = function.__name__
536
537        try:
538            return self._rungroup(subdir=None, testname=name,
539                                  function=function, **dargs)
540        except error.TestError:
541            pass
542        # if there was a non-TestError exception, raise it
543        except Exception, e:
544            exc_info = sys.exc_info()
545            err = ''.join(traceback.format_exception(*exc_info))
546            raise error.TestError(name + ' failed\n' + err)
547
548
549    _RUN_NUMBER_STATE = '__run_number'
550    def get_run_number(self):
551        """Get the run number or 0 if no run number has been set."""
552        return self.get_state(self._RUN_NUMBER_STATE, default=0)
553
554
555    def set_run_number(self, number):
556        """If the run number is non-zero it will be in the output dir name."""
557        self.set_state(self._RUN_NUMBER_STATE, number)
558
559
560    _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag'
561    def _is_kernel_in_test_tag(self):
562        """Boolean: should the kernel version be included in the test tag."""
563        return self.get_state(self._KERNEL_IN_TAG_STATE, default=False)
564
565
566    def show_kernel_in_test_tag(self, value=True):
567        """If True, the kernel version at test time will prefix test tags."""
568        self.set_state(self._KERNEL_IN_TAG_STATE, value)
569
570
571    def set_test_tag(self, tag=''):
572        """Set tag to be added to test name of all following run_test steps."""
573        self.testtag = tag
574
575
576    def set_test_tag_prefix(self, prefix):
577        """Set a prefix string to prepend to all future run_test steps.
578
579        Args:
580          prefix: A string to prepend to any run_test() step tags separated by
581              a '.'; use the empty string '' to clear it.
582        """
583        self._test_tag_prefix = prefix
584
585
586    def new_container(self, mbytes=None, cpus=None, root=None, name=None,
587                      network=None, disk=None, kswapd_merge=False):
588        if not autotest_utils.grep('cpuset', '/proc/filesystems'):
589            print "Containers not enabled by latest reboot"
590            return  # containers weren't enabled in this kernel boot
591        pid = os.getpid()
592        if not name:
593            name = 'test%d' % pid  # make arbitrary unique name
594        self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid,
595                                       cpus=cpus, root=root, network=network,
596                                       disk=disk, kswapd_merge=kswapd_merge)
597        # This job's python shell is now running in the new container
598        # and all forked test processes will inherit that container
599
600
601    def release_container(self):
602        if self.container:
603            self.container = self.container.release()
604
605
606    def cpu_count(self):
607        if self.container:
608            return len(self.container.cpus)
609        return autotest_utils.count_cpus()  # use total system count
610
611
612    def start_reboot(self):
613        self.record('START', None, 'reboot')
614        self._increment_group_level()
615        self.record('GOOD', None, 'reboot.start')
616
617
618    def end_reboot(self, subdir, kernel, patches):
619        kernel_info = {"kernel": kernel}
620        for i, patch in enumerate(patches):
621            kernel_info["patch%d" % i] = patch
622        self._decrement_group_level()
623        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
624
625
626    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
627                              type='src', patches=[]):
628        """ Check the passed kernel identifier against the command line
629            and the running kernel, abort the job on missmatch. """
630
631        print (("POST BOOT: checking booted kernel " +
632                "mark=%d identity='%s' type='%s'") %
633               (expected_when, expected_id, type))
634
635        running_id = autotest_utils.running_os_ident()
636
637        cmdline = utils.read_one_line("/proc/cmdline")
638
639        find_sum = re.compile(r'.*IDENT=(\d+)')
640        m = find_sum.match(cmdline)
641        cmdline_when = -1
642        if m:
643            cmdline_when = int(m.groups()[0])
644
645        # We have all the facts, see if they indicate we
646        # booted the requested kernel or not.
647        bad = False
648        if (type == 'src' and expected_id != running_id or
649            type == 'rpm' and
650            not running_id.startswith(expected_id + '::')):
651            print "check_kernel_ident: kernel identifier mismatch"
652            bad = True
653        if expected_when != cmdline_when:
654            print "check_kernel_ident: kernel command line mismatch"
655            bad = True
656
657        if bad:
658            print "   Expected Ident: " + expected_id
659            print "    Running Ident: " + running_id
660            print "    Expected Mark: %d" % (expected_when)
661            print "Command Line Mark: %d" % (cmdline_when)
662            print "     Command Line: " + cmdline
663
664            self.record("ABORT", subdir, "reboot.verify", "boot failure")
665            self._decrement_group_level()
666            kernel = {"kernel": running_id.split("::")[0]}
667            self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
668            raise error.JobError("reboot returned with the wrong kernel")
669
670        self.record('GOOD', subdir, 'reboot.verify', expected_id)
671        self.end_reboot(subdir, expected_id, patches)
672
673
674    def filesystem(self, device, mountpoint = None, loop_size = 0):
675        if not mountpoint:
676            mountpoint = self.tmpdir
677        return partition.partition(self, device, mountpoint, loop_size)
678
679
680    def enable_external_logging(self):
681        pass
682
683
684    def disable_external_logging(self):
685        pass
686
687
688    def enable_test_cleanup(self):
689        """ By default tests run test.cleanup """
690        self.set_state("__run_test_cleanup", True)
691        self.run_test_cleanup = True
692
693
694    def disable_test_cleanup(self):
695        """ By default tests do not run test.cleanup """
696        self.set_state("__run_test_cleanup", False)
697        self.run_test_cleanup = False
698
699
700    def default_test_cleanup(self, val):
701        if not self._is_continuation:
702            self.set_state("__run_test_cleanup", val)
703            self.run_test_cleanup = val
704
705
706    def default_boot_tag(self, tag):
707        if not self._is_continuation:
708            self.set_state("__last_boot_tag", tag)
709            self.last_boot_tag = tag
710
711
712    def reboot_setup(self):
713        pass
714
715
716    def reboot(self, tag=LAST_BOOT_TAG):
717        if tag == LAST_BOOT_TAG:
718            tag = self.last_boot_tag
719        else:
720            self.set_state("__last_boot_tag", tag)
721            self.last_boot_tag = tag
722
723        self.reboot_setup()
724        self.harness.run_reboot()
725        default = self.config_get('boot.set_default')
726        if default:
727            self.bootloader.set_default(tag)
728        else:
729            self.bootloader.boot_once(tag)
730
731        # HACK: using this as a module sometimes hangs shutdown, so if it's
732        # installed unload it first
733        utils.system("modprobe -r netconsole", ignore_status=True)
734
735        cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
736        utils.system(cmd)
737        self.quit()
738
739
740    def noop(self, text):
741        print "job: noop: " + text
742
743
744    @_run_test_complete_on_exit
745    def parallel(self, *tasklist):
746        """Run tasks in parallel"""
747
748        pids = []
749        old_log_filename = self.log_filename
750        for i, task in enumerate(tasklist):
751            assert isinstance(task, (tuple, list))
752            self.log_filename = old_log_filename + (".%d" % i)
753            task_func = lambda: task[0](*task[1:])
754            pids.append(parallel.fork_start(self.resultdir, task_func))
755
756        old_log_path = os.path.join(self.resultdir, old_log_filename)
757        old_log = open(old_log_path, "a")
758        exceptions = []
759        for i, pid in enumerate(pids):
760            # wait for the task to finish
761            try:
762                parallel.fork_waitfor(self.resultdir, pid)
763            except Exception, e:
764                exceptions.append(e)
765            # copy the logs from the subtask into the main log
766            new_log_path = old_log_path + (".%d" % i)
767            if os.path.exists(new_log_path):
768                new_log = open(new_log_path)
769                old_log.write(new_log.read())
770                new_log.close()
771                old_log.flush()
772                os.remove(new_log_path)
773        old_log.close()
774
775        self.log_filename = old_log_filename
776
777        # handle any exceptions raised by the parallel tasks
778        if exceptions:
779            msg = "%d task(s) failed in job.parallel" % len(exceptions)
780            raise error.JobError(msg)
781
782
783    def quit(self):
784        # XXX: should have a better name.
785        self.harness.run_pause()
786        raise error.JobContinue("more to come")
787
788
789    def complete(self, status):
790        """Clean up and exit"""
791        # We are about to exit 'complete' so clean up the control file.
792        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
793        os.rename(self.state_file, dest)
794
795        self.harness.run_complete()
796        self.disable_external_logging()
797        sys.exit(status)
798
799
800    def set_state(self, var, val):
801        # Deep copies make sure that the state can't be altered
802        # without it being re-written.  Perf wise, deep copies
803        # are overshadowed by pickling/loading.
804        self.state[var] = copy.deepcopy(val)
805        outfile = open(self.state_file, 'w')
806        try:
807            pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL)
808        finally:
809            outfile.close()
810        print "Persistent state variable %s now set to %r" % (var, val)
811
812
813    def _load_state(self):
814        if hasattr(self, 'state'):
815            raise RuntimeError('state already exists')
816        infile = None
817        try:
818            try:
819                infile = open(self.state_file, 'rb')
820                self.state = pickle.load(infile)
821            except IOError:
822                self.state = {}
823                initialize = True
824        finally:
825            if infile:
826                infile.close()
827
828        initialize = '__steps' not in self.state
829        if not (self._is_continuation or initialize):
830            raise RuntimeError('Loaded state must contain __steps or be a '
831                               'continuation.')
832
833        if initialize:
834            print 'Initializing the state engine.'
835            self.set_state('__steps', [])  # writes pickle file
836
837
838    def get_state(self, var, default=NO_DEFAULT):
839        if var in self.state or default == NO_DEFAULT:
840            val = self.state[var]
841        else:
842            val = default
843        return copy.deepcopy(val)
844
845
846    def __create_step_tuple(self, fn, args, dargs):
847        # Legacy code passes in an array where the first arg is
848        # the function or its name.
849        if isinstance(fn, list):
850            assert(len(args) == 0)
851            assert(len(dargs) == 0)
852            args = fn[1:]
853            fn = fn[0]
854        # Pickling actual functions is hairy, thus we have to call
855        # them by name.  Unfortunately, this means only functions
856        # defined globally can be used as a next step.
857        if callable(fn):
858            fn = fn.__name__
859        if not isinstance(fn, types.StringTypes):
860            raise StepError("Next steps must be functions or "
861                            "strings containing the function name")
862        ancestry = copy.copy(self.current_step_ancestry)
863        return (ancestry, fn, args, dargs)
864
865
866    def next_step_append(self, fn, *args, **dargs):
867        """Define the next step and place it at the end"""
868        steps = self.get_state('__steps')
869        steps.append(self.__create_step_tuple(fn, args, dargs))
870        self.set_state('__steps', steps)
871
872
873    def next_step(self, fn, *args, **dargs):
874        """Create a new step and place it after any steps added
875        while running the current step but before any steps added in
876        previous steps"""
877        steps = self.get_state('__steps')
878        steps.insert(self.next_step_index,
879                     self.__create_step_tuple(fn, args, dargs))
880        self.next_step_index += 1
881        self.set_state('__steps', steps)
882
883
884    def next_step_prepend(self, fn, *args, **dargs):
885        """Insert a new step, executing first"""
886        steps = self.get_state('__steps')
887        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
888        self.next_step_index += 1
889        self.set_state('__steps', steps)
890
891
892    def _run_step_fn(self, local_vars, fn, args, dargs):
893        """Run a (step) function within the given context"""
894
895        local_vars['__args'] = args
896        local_vars['__dargs'] = dargs
897        exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
898        return local_vars['__ret']
899
900
901    def _create_frame(self, global_vars, ancestry, fn_name):
902        """Set up the environment like it would have been when this
903        function was first defined.
904
905        Child step engine 'implementations' must have 'return locals()'
906        at end end of their steps.  Because of this, we can call the
907        parent function and get back all child functions (i.e. those
908        defined within it).
909
910        Unfortunately, the call stack of the function calling
911        job.next_step might have been deeper than the function it
912        added.  In order to make sure that the environment is what it
913        should be, we need to then pop off the frames we built until
914        we find the frame where the function was first defined."""
915
916        # The copies ensure that the parent frames are not modified
917        # while building child frames.  This matters if we then
918        # pop some frames in the next part of this function.
919        current_frame = copy.copy(global_vars)
920        frames = [current_frame]
921        for steps_fn_name in ancestry:
922            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
923            current_frame = copy.copy(ret)
924            frames.append(current_frame)
925
926        # Walk up the stack frames until we find the place fn_name was defined.
927        while len(frames) > 2:
928            if fn_name not in frames[-2]:
929                break
930            if frames[-2][fn_name] != frames[-1][fn_name]:
931                break
932            frames.pop()
933            ancestry.pop()
934
935        return (frames[-1], ancestry)
936
937
938    def _add_step_init(self, local_vars, current_function):
939        """If the function returned a dictionary that includes a
940        function named 'step_init', prepend it to our list of steps.
941        This will only get run the first time a function with a nested
942        use of the step engine is run."""
943
944        if (isinstance(local_vars, dict) and
945            'step_init' in local_vars and
946            callable(local_vars['step_init'])):
947            # The init step is a child of the function
948            # we were just running.
949            self.current_step_ancestry.append(current_function)
950            self.next_step_prepend('step_init')
951
952
953    def step_engine(self):
954        """The multi-run engine used when the control file defines step_init.
955
956        Does the next step.
957        """
958
959        # Set up the environment and then interpret the control file.
960        # Some control files will have code outside of functions,
961        # which means we need to have our state engine initialized
962        # before reading in the file.
963        global_control_vars = {'job': self}
964        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
965        execfile(self.control, global_control_vars, global_control_vars)
966
967        # If we loaded in a mid-job state file, then we presumably
968        # know what steps we have yet to run.
969        if not self._is_continuation:
970            if 'step_init' in global_control_vars:
971                self.next_step(global_control_vars['step_init'])
972
973        # Iterate through the steps.  If we reboot, we'll simply
974        # continue iterating on the next step.
975        while len(self.get_state('__steps')) > 0:
976            steps = self.get_state('__steps')
977            (ancestry, fn_name, args, dargs) = steps.pop(0)
978            self.set_state('__steps', steps)
979
980            self.next_step_index = 0
981            ret = self._create_frame(global_control_vars, ancestry, fn_name)
982            local_vars, self.current_step_ancestry = ret
983            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
984            self._add_step_init(local_vars, fn_name)
985
986
987    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
988        self._add_sysinfo_loggable(sysinfo.command(command, logfile),
989                                   on_every_test)
990
991
992    def add_sysinfo_logfile(self, file, on_every_test=False):
993        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
994
995
996    def _add_sysinfo_loggable(self, loggable, on_every_test):
997        if on_every_test:
998            self.sysinfo.test_loggables.add(loggable)
999        else:
1000            self.sysinfo.boot_loggables.add(loggable)
1001        self._save_sysinfo_state()
1002
1003
1004    def _load_sysinfo_state(self):
1005        state = self.get_state("__sysinfo", None)
1006        if state:
1007            self.sysinfo.deserialize(state)
1008
1009
1010    def _save_sysinfo_state(self):
1011        state = self.sysinfo.serialize()
1012        self.set_state("__sysinfo", state)
1013
1014
1015    def _init_group_level(self):
1016        self.group_level = self.get_state("__group_level", default=0)
1017
1018
1019    def _increment_group_level(self):
1020        self.group_level += 1
1021        self.set_state("__group_level", self.group_level)
1022
1023
1024    def _decrement_group_level(self):
1025        self.group_level -= 1
1026        self.set_state("__group_level", self.group_level)
1027
1028
1029    def record(self, status_code, subdir, operation, status = '',
1030               optional_fields=None):
1031        """
1032        Record job-level status
1033
1034        The intent is to make this file both machine parseable and
1035        human readable. That involves a little more complexity, but
1036        really isn't all that bad ;-)
1037
1038        Format is <status code>\t<subdir>\t<operation>\t<status>
1039
1040        status code: (GOOD|WARN|FAIL|ABORT)
1041                or   START
1042                or   END (GOOD|WARN|FAIL|ABORT)
1043
1044        subdir: MUST be a relevant subdirectory in the results,
1045        or None, which will be represented as '----'
1046
1047        operation: description of what you ran (e.g. "dbench", or
1048                                        "mkfs -t foobar /dev/sda9")
1049
1050        status: error message or "completed sucessfully"
1051
1052        ------------------------------------------------------------
1053
1054        Initial tabs indicate indent levels for grouping, and is
1055        governed by self.group_level
1056
1057        multiline messages have secondary lines prefaced by a double
1058        space ('  ')
1059        """
1060
1061        if subdir:
1062            if re.match(r'[\n\t]', subdir):
1063                raise ValueError("Invalid character in subdir string")
1064            substr = subdir
1065        else:
1066            substr = '----'
1067
1068        if not log.is_valid_status(status_code):
1069            raise ValueError("Invalid status code supplied: %s" % status_code)
1070        if not operation:
1071            operation = '----'
1072
1073        if re.match(r'[\n\t]', operation):
1074            raise ValueError("Invalid character in operation string")
1075        operation = operation.rstrip()
1076
1077        if not optional_fields:
1078            optional_fields = {}
1079
1080        status = status.rstrip()
1081        status = re.sub(r"\t", "  ", status)
1082        # Ensure any continuation lines are marked so we can
1083        # detect them in the status file to ensure it is parsable.
1084        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
1085
1086        # Generate timestamps for inclusion in the logs
1087        epoch_time = int(time.time())  # seconds since epoch, in UTC
1088        local_time = time.localtime(epoch_time)
1089        optional_fields["timestamp"] = str(epoch_time)
1090        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1091                                                     local_time)
1092
1093        fields = [status_code, substr, operation]
1094        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1095        fields.append(status)
1096
1097        msg = '\t'.join(str(x) for x in fields)
1098        msg = '\t' * self.group_level + msg
1099
1100        msg_tag = ""
1101        if "." in self.log_filename:
1102            msg_tag = self.log_filename.split(".", 1)[1]
1103
1104        self.harness.test_status_detail(status_code, substr, operation, status,
1105                                        msg_tag)
1106        self.harness.test_status(msg, msg_tag)
1107
1108        # log to stdout (if enabled)
1109        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
1110        print msg
1111
1112        # log to the "root" status log
1113        status_file = os.path.join(self.resultdir, self.log_filename)
1114        open(status_file, "a").write(msg + "\n")
1115
1116        # log to the subdir status log (if subdir is set)
1117        if subdir:
1118            dir = os.path.join(self.resultdir, subdir)
1119            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1120            open(status_file, "a").write(msg + "\n")
1121
1122
1123class disk_usage_monitor:
1124    def __init__(self, logging_func, device, max_mb_per_hour):
1125        self.func = logging_func
1126        self.device = device
1127        self.max_mb_per_hour = max_mb_per_hour
1128
1129
1130    def start(self):
1131        self.initial_space = autotest_utils.freespace(self.device)
1132        self.start_time = time.time()
1133
1134
1135    def stop(self):
1136        # if no maximum usage rate was set, we don't need to
1137        # generate any warnings
1138        if not self.max_mb_per_hour:
1139            return
1140
1141        final_space = autotest_utils.freespace(self.device)
1142        used_space = self.initial_space - final_space
1143        stop_time = time.time()
1144        total_time = stop_time - self.start_time
1145        # round up the time to one minute, to keep extremely short
1146        # tests from generating false positives due to short, badly
1147        # timed bursts of activity
1148        total_time = max(total_time, 60.0)
1149
1150        # determine the usage rate
1151        bytes_per_sec = used_space / total_time
1152        mb_per_sec = bytes_per_sec / 1024**2
1153        mb_per_hour = mb_per_sec * 60 * 60
1154
1155        if mb_per_hour > self.max_mb_per_hour:
1156            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1157            msg %= (self.device, mb_per_hour)
1158            self.func(msg)
1159
1160
1161    @classmethod
1162    def watch(cls, *monitor_args, **monitor_dargs):
1163        """ Generic decorator to wrap a function call with the
1164        standard create-monitor -> start -> call -> stop idiom."""
1165        def decorator(func):
1166            def watched_func(*args, **dargs):
1167                monitor = cls(*monitor_args, **monitor_dargs)
1168                monitor.start()
1169                try:
1170                    func(*args, **dargs)
1171                finally:
1172                    monitor.stop()
1173            return watched_func
1174        return decorator
1175
1176
1177def runjob(control, cont = False, tag = "default", harness_type = '',
1178           use_external_logging = False):
1179    """The main interface to this module
1180
1181    control
1182            The control file to use for this job.
1183    cont
1184            Whether this is the continuation of a previously started job
1185    """
1186    control = os.path.abspath(control)
1187    state = control + '.state'
1188
1189    # instantiate the job object ready for the control file.
1190    myjob = None
1191    try:
1192        # Check that the control file is valid
1193        if not os.path.exists(control):
1194            raise error.JobError(control + ": control file not found")
1195
1196        # When continuing, the job is complete when there is no
1197        # state file, ensure we don't try and continue.
1198        if cont and not os.path.exists(state):
1199            raise error.JobComplete("all done")
1200
1201        myjob = job(control, tag, cont, harness_type, use_external_logging)
1202
1203        # Load in the users control file, may do any one of:
1204        #  1) execute in toto
1205        #  2) define steps, and select the first via next_step()
1206        myjob.step_engine()
1207
1208    except error.JobContinue:
1209        sys.exit(5)
1210
1211    except error.JobComplete:
1212        sys.exit(1)
1213
1214    except error.JobError, instance:
1215        print "JOB ERROR: " + instance.args[0]
1216        if myjob:
1217            command = None
1218            if len(instance.args) > 1:
1219                command = instance.args[1]
1220                myjob.record('ABORT', None, command, instance.args[0])
1221            myjob._decrement_group_level()
1222            myjob.record('END ABORT', None, None, instance.args[0])
1223            assert(myjob.group_level == 0)
1224            myjob.complete(1)
1225        else:
1226            sys.exit(1)
1227
1228    except Exception, e:
1229        msg = str(e) + '\n' + traceback.format_exc()
1230        print "JOB ERROR: " + msg
1231        if myjob:
1232            myjob._decrement_group_level()
1233            myjob.record('END ABORT', None, None, msg)
1234            assert(myjob.group_level == 0)
1235            myjob.complete(1)
1236        else:
1237            sys.exit(1)
1238
1239    # If we get here, then we assume the job is complete and good.
1240    myjob._decrement_group_level()
1241    myjob.record('END GOOD', None, None)
1242    assert(myjob.group_level == 0)
1243
1244    myjob.complete(0)
1245
1246
1247# site_job.py may be non-existant or empty, make sure that an appropriate
1248# site_job class is created nevertheless
1249try:
1250    from autotest_lib.client.bin.site_job import site_job
1251except ImportError:
1252    class site_job(object):
1253        pass
1254
1255class job(site_job, base_job):
1256    pass
1257