job.py revision b931b687e09cec9b25f37d437f33be73b6de20f3
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback, types, copy
10
11# autotest stuff
12from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, fd_stack, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, cpuset, test, filesystem
15from autotest_lib.client.common_lib import error, barrier, log, utils
16from autotest_lib.client.common_lib import packages
17
18JOB_PREAMBLE = """
19from autotest_lib.client.common_lib.error import *
20from autotest_lib.client.common_lib.utils import *
21from autotest_lib.client.bin.autotest_utils import *
22"""
23
24class StepError(error.AutotestError):
25    pass
26
27class NotAvailableError(error.AutotestError):
28    pass
29
30
31
32def _run_test_complete_on_exit(f):
33    """Decorator for job methods that automatically calls
34    self.harness.run_test_complete when the method exits, if appropriate."""
35    def wrapped(self, *args, **dargs):
36        try:
37            return f(self, *args, **dargs)
38        finally:
39            if self.log_filename == self.DEFAULT_LOG_FILENAME:
40                self.harness.run_test_complete()
41    wrapped.__name__ = f.__name__
42    wrapped.__doc__ = f.__doc__
43    wrapped.__dict__.update(f.__dict__)
44    return wrapped
45
46
47class base_job(object):
48    """The actual job against which we do everything.
49
50    Properties:
51            autodir
52                    The top level autotest directory (/usr/local/autotest).
53                    Comes from os.environ['AUTODIR'].
54            bindir
55                    <autodir>/bin/
56            libdir
57                    <autodir>/lib/
58            testdir
59                    <autodir>/tests/
60            site_testdir
61                    <autodir>/site_tests/
62            profdir
63                    <autodir>/profilers/
64            tmpdir
65                    <autodir>/tmp/
66            pkgdir
67                    <autodir>/packages/
68            resultdir
69                    <autodir>/results/<jobtag>
70            toolsdir
71                    <autodir>/tools/
72            stdout
73                    fd_stack object for stdout
74            stderr
75                    fd_stack object for stderr
76            profilers
77                    the profilers object for this job
78            harness
79                    the server harness object for this job
80            config
81                    the job configuration for this job
82    """
83
84    DEFAULT_LOG_FILENAME = "status"
85
86    def __init__(self, control, jobtag, cont, harness_type=None,
87                    use_external_logging = False):
88        """
89                control
90                        The control file (pathname of)
91                jobtag
92                        The job tag string (eg "default")
93                cont
94                        If this is the continuation of this job
95                harness_type
96                        An alternative server harness
97        """
98        self.drop_caches = True
99        self.autodir = os.environ['AUTODIR']
100        self.bindir = os.path.join(self.autodir, 'bin')
101        self.libdir = os.path.join(self.autodir, 'lib')
102        self.testdir = os.path.join(self.autodir, 'tests')
103        self.site_testdir = os.path.join(self.autodir, 'site_tests')
104        self.profdir = os.path.join(self.autodir, 'profilers')
105        self.tmpdir = os.path.join(self.autodir, 'tmp')
106        self.toolsdir = os.path.join(self.autodir, 'tools')
107        self.resultdir = os.path.join(self.autodir, 'results', jobtag)
108        self.sysinfo = sysinfo.sysinfo(self.resultdir)
109        self.control = os.path.abspath(control)
110        self.state_file = self.control + '.state'
111        self.current_step_ancestry = []
112        self.next_step_index = 0
113        self.testtag = ''
114        self._load_state()
115        self.pkgmgr = packages.PackageManager(
116            self.autodir, run_function_dargs={'timeout':600})
117        self.pkgdir = os.path.join(self.autodir, 'packages')
118        self.run_test_cleanup = self.get_state("__run_test_cleanup",
119                                                default=True)
120
121        if not cont:
122            """
123            Don't cleanup the tmp dir (which contains the lockfile)
124            in the constructor, this would be a problem for multiple
125            jobs starting at the same time on the same client. Instead
126            do the delete at the server side. We simply create the tmp
127            directory here if it does not already exist.
128            """
129            if not os.path.exists(self.tmpdir):
130                os.mkdir(self.tmpdir)
131
132            if not os.path.exists(self.pkgdir):
133                os.mkdir(self.pkgdir)
134
135            results = os.path.join(self.autodir, 'results')
136            if not os.path.exists(results):
137                os.mkdir(results)
138
139            download = os.path.join(self.testdir, 'download')
140            if not os.path.exists(download):
141                os.mkdir(download)
142
143            if os.path.exists(self.resultdir):
144                utils.system('rm -rf ' + self.resultdir)
145            os.mkdir(self.resultdir)
146
147            os.mkdir(os.path.join(self.resultdir, 'debug'))
148            os.mkdir(os.path.join(self.resultdir, 'analysis'))
149
150            shutil.copyfile(self.control,
151                            os.path.join(self.resultdir, 'control'))
152
153
154        self.control = control
155        self.jobtag = jobtag
156        self.log_filename = self.DEFAULT_LOG_FILENAME
157        self.container = None
158
159        self.stdout = fd_stack.fd_stack(1, sys.stdout)
160        self.stderr = fd_stack.fd_stack(2, sys.stderr)
161
162        self._init_group_level()
163
164        self.config = config.config(self)
165        self.harness = harness.select(harness_type, self)
166        self.profilers = profilers.profilers(self)
167
168        try:
169            tool = self.config_get('boottool.executable')
170            self.bootloader = boottool.boottool(tool)
171        except:
172            pass
173
174        self.sysinfo.log_per_reboot_data()
175
176        if not cont:
177            self.record('START', None, None)
178            self._increment_group_level()
179
180        self.harness.run_start()
181
182        if use_external_logging:
183            self.enable_external_logging()
184
185        # load the max disk usage rate - default to no monitoring
186        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
187
188
189    def monitor_disk_usage(self, max_rate):
190        """\
191        Signal that the job should monitor disk space usage on /
192        and generate a warning if a test uses up disk space at a
193        rate exceeding 'max_rate'.
194
195        Parameters:
196             max_rate - the maximium allowed rate of disk consumption
197                        during a test, in MB/hour, or 0 to indicate
198                        no limit.
199        """
200        self.set_state('__monitor_disk', max_rate)
201        self.max_disk_usage_rate = max_rate
202
203
204    def relative_path(self, path):
205        """\
206        Return a patch relative to the job results directory
207        """
208        head = len(self.resultdir) + 1     # remove the / inbetween
209        return path[head:]
210
211
212    def control_get(self):
213        return self.control
214
215
216    def control_set(self, control):
217        self.control = os.path.abspath(control)
218
219
220    def harness_select(self, which):
221        self.harness = harness.select(which, self)
222
223
224    def config_set(self, name, value):
225        self.config.set(name, value)
226
227
228    def config_get(self, name):
229        return self.config.get(name)
230
231
232    def setup_dirs(self, results_dir, tmp_dir):
233        if not tmp_dir:
234            tmp_dir = os.path.join(self.tmpdir, 'build')
235        if not os.path.exists(tmp_dir):
236            os.mkdir(tmp_dir)
237        if not os.path.isdir(tmp_dir):
238            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
239            raise ValueError(e_msg)
240
241        # We label the first build "build" and then subsequent ones
242        # as "build.2", "build.3", etc. Whilst this is a little bit
243        # inconsistent, 99.9% of jobs will only have one build
244        # (that's not done as kernbench, sparse, or buildtest),
245        # so it works out much cleaner. One of life's comprimises.
246        if not results_dir:
247            results_dir = os.path.join(self.resultdir, 'build')
248            i = 2
249            while os.path.exists(results_dir):
250                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
251                i += 1
252        if not os.path.exists(results_dir):
253            os.mkdir(results_dir)
254
255        return (results_dir, tmp_dir)
256
257
258    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
259                            kjob = None ):
260        """Summon a xen object"""
261        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
262        build_dir = 'xen'
263        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
264                       leave, kjob)
265
266
267    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
268        """Summon a kernel object"""
269        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
270        build_dir = 'linux'
271        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
272                                  build_dir, leave)
273
274
275    def barrier(self, *args, **kwds):
276        """Create a barrier object"""
277        return barrier.barrier(*args, **kwds)
278
279
280    def install_pkg(self, name, pkg_type, install_dir):
281        '''
282        This method is a simple wrapper around the actual package
283        installation method in the Packager class. This is used
284        internally by the profilers, deps and tests code.
285        name : name of the package (ex: sleeptest, dbench etc.)
286        pkg_type : Type of the package (ex: test, dep etc.)
287        install_dir : The directory in which the source is actually
288                      untarred into. (ex: client/profilers/<name> for profilers)
289        '''
290        if len(self.pkgmgr.repo_urls) > 0:
291            self.pkgmgr.install_pkg(name, pkg_type,
292                                    self.pkgdir, install_dir)
293
294
295    def add_repository(self, repo_urls):
296        '''
297        Adds the repository locations to the job so that packages
298        can be fetched from them when needed. The repository list
299        needs to be a string list
300        Ex: job.add_repository(['http://blah1','http://blah2'])
301        '''
302        # TODO(aganti): Validate the list of the repository URLs.
303        repositories = repo_urls + self.pkgmgr.repo_urls
304        self.pkgmgr = packages.PackageManager(
305            self.autodir, repo_urls=repositories,
306            run_function_dargs={'timeout':600})
307        # Fetch the packages' checksum file that contains the checksums
308        # of all the packages if it is not already fetched. The checksum
309        # is always fetched whenever a job is first started. This
310        # is not done in the job's constructor as we don't have the list of
311        # the repositories there (and obviously don't care about this file
312        # if we are not using the repos)
313        try:
314            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
315                                              packages.CHECKSUM_FILE)
316            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path,
317                                  use_checksum=False)
318        except packages.PackageFetchError, e:
319            # packaging system might not be working in this case
320            # Silently fall back to the normal case
321            pass
322
323
324    def require_gcc(self):
325        """
326        Test whether gcc is installed on the machine.
327        """
328        # check if gcc is installed on the system.
329        try:
330            utils.system('which gcc')
331        except error.CmdError, e:
332            raise NotAvailableError('gcc is required by this job and is '
333                                    'not available on the system')
334
335
336    def setup_dep(self, deps):
337        """Set up the dependencies for this test.
338        deps is a list of libraries required for this test.
339        """
340        # Fetch the deps from the repositories and set them up.
341        for dep in deps:
342            dep_dir = os.path.join(self.autodir, 'deps', dep)
343            # Search for the dependency in the repositories if specified,
344            # else check locally.
345            try:
346                self.install_pkg(dep, 'dep', dep_dir)
347            except packages.PackageInstallError:
348                # see if the dep is there locally
349                pass
350
351            # dep_dir might not exist if it is not fetched from the repos
352            if not os.path.exists(dep_dir):
353                raise error.TestError("Dependency %s does not exist" % dep)
354
355            os.chdir(dep_dir)
356            utils.system('./' + dep + '.py')
357
358
359    def _runtest(self, url, tag, args, dargs):
360        try:
361            l = lambda : test.runtest(self, url, tag, args, dargs)
362            pid = parallel.fork_start(self.resultdir, l)
363            parallel.fork_waitfor(self.resultdir, pid)
364        except error.TestBaseException:
365            raise
366        except Exception, e:
367            raise error.UnhandledTestError(e)
368
369
370    @_run_test_complete_on_exit
371    def run_test(self, url, *args, **dargs):
372        """Summon a test object and run it.
373
374        tag
375                tag to add to testname
376        url
377                url of the test to run
378        """
379
380        if not url:
381            raise TypeError("Test name is invalid. "
382                            "Switched arguments?")
383        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
384        namelen = len(testname)
385        dargs = dargs.copy()
386        tntag = dargs.pop('tag', None)
387        if tntag:         # per-test tag  is included in reported test name
388            testname += '.' + str(tntag)
389        if self.testtag:  # job-level tag is included in reported test name
390            testname += '.' + self.testtag
391        subdir = testname
392        sdtag = dargs.pop('subdir_tag', None)
393        if sdtag:  # subdir-only tag is not included in reports
394            subdir = subdir + '.' + str(sdtag)
395        tag = subdir[namelen+1:]    # '' if none
396
397        outputdir = os.path.join(self.resultdir, subdir)
398        if os.path.exists(outputdir):
399            msg = ("%s already exists, test <%s> may have"
400                    " already run with tag <%s>"
401                    % (outputdir, testname, tag) )
402            raise error.TestError(msg)
403        os.mkdir(outputdir)
404
405        container = dargs.pop('container', None)
406        if container:
407            cname = container.get('name', None)
408            if not cname:   # get old name
409                cname = container.get('container_name', None)
410            mbytes = container.get('mbytes', None)
411            if not mbytes:  # get old name
412                mbytes = container.get('mem', None)
413            cpus  = container.get('cpus', None)
414            if not cpus:    # get old name
415                cpus  = container.get('cpu', None)
416            root  = container.get('root', None)
417            network = container.get('network', None)
418            disk = container.get('disk', None)
419            try:
420                self.new_container(mbytes=mbytes, cpus=cpus, root=root,
421                                   name=cname, network=network, disk=disk)
422            except cpuset.CpusetsNotAvailable, e:
423                self.record("START", subdir, testname)
424                self._increment_group_level()
425                self.record("ERROR", subdir, testname, str(e))
426                self._decrement_group_level()
427                self.record("END ERROR", subdir, testname)
428                return False
429            # We are running in a container now...
430
431        def log_warning(reason):
432            self.record("WARN", subdir, testname, reason)
433        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
434        def group_func():
435            try:
436                self._runtest(url, tag, args, dargs)
437            except error.TestBaseException, detail:
438                self.record(detail.exit_status, subdir, testname,
439                            str(detail))
440                raise
441            except Exception, detail:
442                self.record('FAIL', subdir, testname,
443                            str(detail))
444                raise
445            else:
446                self.record('GOOD', subdir, testname,
447                            'completed successfully')
448
449        result, exc_info = self._rungroup(subdir, testname, group_func)
450        if container:
451            self.release_container()
452        if exc_info and isinstance(exc_info[1], error.TestBaseException):
453            return False
454        elif exc_info:
455            raise exc_info[0], exc_info[1], exc_info[2]
456        else:
457            return True
458
459
460    def _rungroup(self, subdir, testname, function, *args, **dargs):
461        """\
462        subdir:
463                name of the group
464        testname:
465                name of the test to run, or support step
466        function:
467                subroutine to run
468        *args:
469                arguments for the function
470
471        Returns a 2-tuple (result, exc_info) where result
472        is the return value of function, and exc_info is
473        the sys.exc_info() of the exception thrown by the
474        function (which may be None).
475        """
476
477        result, exc_info = None, None
478        try:
479            self.record('START', subdir, testname)
480            self._increment_group_level()
481            result = function(*args, **dargs)
482            self._decrement_group_level()
483            self.record('END GOOD', subdir, testname)
484        except error.TestBaseException, e:
485            exc_info = sys.exc_info()
486            self._decrement_group_level()
487            self.record('END %s' % e.exit_status, subdir, testname, str(e))
488        except Exception, e:
489            exc_info = sys.exc_info()
490            self._decrement_group_level()
491            err_msg = str(e) + '\n' + traceback.format_exc()
492            self.record('END FAIL', subdir, testname, err_msg)
493
494        return result, exc_info
495
496
497    def run_group(self, function, *args, **dargs):
498        """\
499        function:
500                subroutine to run
501        *args:
502                arguments for the function
503        """
504
505        # Allow the tag for the group to be specified
506        name = function.__name__
507        tag = dargs.pop('tag', None)
508        if tag:
509            name = tag
510
511        outputdir = os.path.join(self.resultdir, name)
512        if os.path.exists(outputdir):
513            msg = ("%s already exists, test <%s> may have"
514                    " already run with tag <%s>"
515                    % (outputdir, name, name) )
516            raise error.TestError(msg)
517        os.mkdir(outputdir)
518
519        result, exc_info = self._rungroup(name, name, function, *args, **dargs)
520
521        # if there was a non-TestError exception, raise it
522        if exc_info and not isinstance(exc_info[1], error.TestError):
523            err = ''.join(traceback.format_exception(*exc_info))
524            raise error.TestError(name + ' failed\n' + err)
525
526        # pass back the actual return value from the function
527        return result
528
529
530    def set_test_tag(self, tag=''):
531        # set tag to be added to test name of all following run_test() steps
532        self.testtag = tag
533
534
535    def new_container(self, mbytes=None, cpus=None, root=None, name=None,
536                      network=None, disk=None, kswapd_merge=False):
537        if not autotest_utils.grep('cpuset', '/proc/filesystems'):
538            print "Containers not enabled by latest reboot"
539            return  # containers weren't enabled in this kernel boot
540        pid = os.getpid()
541        if not name:
542            name = 'test%d' % pid  # make arbitrary unique name
543        self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid,
544                                       cpus=cpus, root=root, network=network,
545                                       disk=disk, kswapd_merge=kswapd_merge)
546        # This job's python shell is now running in the new container
547        # and all forked test processes will inherit that container
548
549
550    def release_container(self):
551        if self.container:
552            self.container = self.container.release()
553
554
555    def cpu_count(self):
556        if self.container:
557            return len(self.container.cpus)
558        return autotest_utils.count_cpus()  # use total system count
559
560
561    def end_reboot(self, subdir, kernel, patches):
562        kernel_info = {"kernel": kernel}
563        for i, patch in enumerate(patches):
564            kernel_info["patch%d" % i] = patch
565        self._decrement_group_level()
566        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
567
568
569    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
570                              type='src', patches=[]):
571        """ Check the passed kernel identifier against the command line
572            and the running kernel, abort the job on missmatch. """
573
574        print (("POST BOOT: checking booted kernel " +
575                "mark=%d identity='%s' type='%s'") %
576               (expected_when, expected_id, type))
577
578        running_id = autotest_utils.running_os_ident()
579
580        cmdline = utils.read_one_line("/proc/cmdline")
581
582        find_sum = re.compile(r'.*IDENT=(\d+)')
583        m = find_sum.match(cmdline)
584        cmdline_when = -1
585        if m:
586            cmdline_when = int(m.groups()[0])
587
588        # We have all the facts, see if they indicate we
589        # booted the requested kernel or not.
590        bad = False
591        if (type == 'src' and expected_id != running_id or
592            type == 'rpm' and
593            not running_id.startswith(expected_id + '::')):
594            print "check_kernel_ident: kernel identifier mismatch"
595            bad = True
596        if expected_when != cmdline_when:
597            print "check_kernel_ident: kernel command line mismatch"
598            bad = True
599
600        if bad:
601            print "   Expected Ident: " + expected_id
602            print "    Running Ident: " + running_id
603            print "    Expected Mark: %d" % (expected_when)
604            print "Command Line Mark: %d" % (cmdline_when)
605            print "     Command Line: " + cmdline
606
607            self.record("ABORT", subdir, "reboot.verify", "boot failure")
608            self._decrement_group_level()
609            kernel = {"kernel": running_id.split("::")[0]}
610            self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
611            raise error.JobError("reboot returned with the wrong kernel")
612
613        self.record('GOOD', subdir, 'reboot.verify', expected_id)
614        self.end_reboot(subdir, expected_id, patches)
615
616
617    def filesystem(self, device, mountpoint = None, loop_size = 0):
618        if not mountpoint:
619            mountpoint = self.tmpdir
620        return filesystem.filesystem(self, device, mountpoint,loop_size)
621
622
623    def enable_external_logging(self):
624        pass
625
626
627    def disable_external_logging(self):
628        pass
629
630
631    def enable_test_cleanup(self):
632        """ By default tests run test.cleanup """
633        self.set_state("__run_test_cleanup", True)
634        self.run_test_cleanup = True
635
636
637    def disable_test_cleanup(self):
638        """ By default tests do not run test.cleanup """
639        self.set_state("__run_test_cleanup", False)
640        self.run_test_cleanup = False
641
642
643    def reboot_setup(self):
644        pass
645
646
647    def reboot(self, tag='autotest'):
648        self.reboot_setup()
649        self.record('START', None, 'reboot')
650        self._increment_group_level()
651        self.record('GOOD', None, 'reboot.start')
652        self.harness.run_reboot()
653        default = self.config_get('boot.set_default')
654        if default:
655            self.bootloader.set_default(tag)
656        else:
657            self.bootloader.boot_once(tag)
658
659        # HACK: using this as a module sometimes hangs shutdown, so if it's
660        # installed unload it first
661        utils.system("modprobe -r netconsole", ignore_status=True)
662
663        cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
664        utils.system(cmd)
665        self.quit()
666
667
668    def noop(self, text):
669        print "job: noop: " + text
670
671
672    @_run_test_complete_on_exit
673    def parallel(self, *tasklist):
674        """Run tasks in parallel"""
675
676        pids = []
677        old_log_filename = self.log_filename
678        for i, task in enumerate(tasklist):
679            self.log_filename = old_log_filename + (".%d" % i)
680            task_func = lambda: task[0](*task[1:])
681            pids.append(parallel.fork_start(self.resultdir, task_func))
682
683        old_log_path = os.path.join(self.resultdir, old_log_filename)
684        old_log = open(old_log_path, "a")
685        exceptions = []
686        for i, pid in enumerate(pids):
687            # wait for the task to finish
688            try:
689                parallel.fork_waitfor(self.resultdir, pid)
690            except Exception, e:
691                exceptions.append(e)
692            # copy the logs from the subtask into the main log
693            new_log_path = old_log_path + (".%d" % i)
694            if os.path.exists(new_log_path):
695                new_log = open(new_log_path)
696                old_log.write(new_log.read())
697                new_log.close()
698                old_log.flush()
699                os.remove(new_log_path)
700        old_log.close()
701
702        self.log_filename = old_log_filename
703
704        # handle any exceptions raised by the parallel tasks
705        if exceptions:
706            msg = "%d task(s) failed in job.parallel" % len(exceptions)
707            raise error.JobError(msg)
708
709
710    def quit(self):
711        # XXX: should have a better name.
712        self.harness.run_pause()
713        raise error.JobContinue("more to come")
714
715
716    def complete(self, status):
717        """Clean up and exit"""
718        # We are about to exit 'complete' so clean up the control file.
719        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
720        os.rename(self.state_file, dest)
721
722        self.harness.run_complete()
723        self.disable_external_logging()
724        sys.exit(status)
725
726
727    def set_state(self, var, val):
728        # Deep copies make sure that the state can't be altered
729        # without it being re-written.  Perf wise, deep copies
730        # are overshadowed by pickling/loading.
731        self.state[var] = copy.deepcopy(val)
732        pickle.dump(self.state, open(self.state_file, 'w'))
733        print "Persistant state variable %s now set to %r" % (var, val)
734
735
736    def _load_state(self):
737        assert not hasattr(self, "state")
738        try:
739            self.state = pickle.load(open(self.state_file, 'r'))
740            self.state_existed = True
741        except Exception:
742            print "Initializing the state engine."
743            self.state = {}
744            self.set_state('__steps', []) # writes pickle file
745            self.state_existed = False
746
747
748    def get_state(self, var, default=None):
749        if var in self.state or default == None:
750            val = self.state[var]
751        else:
752            val = default
753        return copy.deepcopy(val)
754
755
756    def __create_step_tuple(self, fn, args, dargs):
757        # Legacy code passes in an array where the first arg is
758        # the function or its name.
759        if isinstance(fn, list):
760            assert(len(args) == 0)
761            assert(len(dargs) == 0)
762            args = fn[1:]
763            fn = fn[0]
764        # Pickling actual functions is harry, thus we have to call
765        # them by name.  Unfortunately, this means only functions
766        # defined globally can be used as a next step.
767        if callable(fn):
768            fn = fn.__name__
769        if not isinstance(fn, types.StringTypes):
770            raise StepError("Next steps must be functions or "
771                            "strings containing the function name")
772        ancestry = copy.copy(self.current_step_ancestry)
773        return (ancestry, fn, args, dargs)
774
775
776    def next_step_append(self, fn, *args, **dargs):
777        """Define the next step and place it at the end"""
778        steps = self.get_state('__steps')
779        steps.append(self.__create_step_tuple(fn, args, dargs))
780        self.set_state('__steps', steps)
781
782
783    def next_step(self, fn, *args, **dargs):
784        """Create a new step and place it after any steps added
785        while running the current step but before any steps added in
786        previous steps"""
787        steps = self.get_state('__steps')
788        steps.insert(self.next_step_index,
789                     self.__create_step_tuple(fn, args, dargs))
790        self.next_step_index += 1
791        self.set_state('__steps', steps)
792
793
794    def next_step_prepend(self, fn, *args, **dargs):
795        """Insert a new step, executing first"""
796        steps = self.get_state('__steps')
797        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
798        self.next_step_index += 1
799        self.set_state('__steps', steps)
800
801
802    def _run_step_fn(self, local_vars, fn, args, dargs):
803        """Run a (step) function within the given context"""
804
805        local_vars['__args'] = args
806        local_vars['__dargs'] = dargs
807        exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
808        return local_vars['__ret']
809
810
811    def _create_frame(self, global_vars, ancestry, fn_name):
812        """Set up the environment like it would have been when this
813        function was first defined.
814
815        Child step engine 'implementations' must have 'return locals()'
816        at end end of their steps.  Because of this, we can call the
817        parent function and get back all child functions (i.e. those
818        defined within it).
819
820        Unfortunately, the call stack of the function calling
821        job.next_step might have been deeper than the function it
822        added.  In order to make sure that the environment is what it
823        should be, we need to then pop off the frames we built until
824        we find the frame where the function was first defined."""
825
826        # The copies ensure that the parent frames are not modified
827        # while building child frames.  This matters if we then
828        # pop some frames in the next part of this function.
829        current_frame = copy.copy(global_vars)
830        frames = [current_frame]
831        for steps_fn_name in ancestry:
832            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
833            current_frame = copy.copy(ret)
834            frames.append(current_frame)
835
836        while len(frames) > 2:
837            if fn_name not in frames[-2]:
838                break
839            if frames[-2][fn_name] != frames[-1][fn_name]:
840                break
841            frames.pop()
842            ancestry.pop()
843
844        return (frames[-1], ancestry)
845
846
847    def _add_step_init(self, local_vars, current_function):
848        """If the function returned a dictionary that includes a
849        function named 'step_init', prepend it to our list of steps.
850        This will only get run the first time a function with a nested
851        use of the step engine is run."""
852
853        if (isinstance(local_vars, dict) and
854            'step_init' in local_vars and
855            callable(local_vars['step_init'])):
856            # The init step is a child of the function
857            # we were just running.
858            self.current_step_ancestry.append(current_function)
859            self.next_step_prepend('step_init')
860
861
862    def step_engine(self):
863        """the stepping engine -- if the control file defines
864        step_init we will be using this engine to drive multiple runs.
865        """
866        """Do the next step"""
867
868        # Set up the environment and then interpret the control file.
869        # Some control files will have code outside of functions,
870        # which means we need to have our state engine initialized
871        # before reading in the file.
872        global_control_vars = {'job': self}
873        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
874        execfile(self.control, global_control_vars, global_control_vars)
875
876        # If we loaded in a mid-job state file, then we presumably
877        # know what steps we have yet to run.
878        if not self.state_existed:
879            if global_control_vars.has_key('step_init'):
880                self.next_step(global_control_vars['step_init'])
881
882        # Iterate through the steps.  If we reboot, we'll simply
883        # continue iterating on the next step.
884        while len(self.get_state('__steps')) > 0:
885            steps = self.get_state('__steps')
886            (ancestry, fn_name, args, dargs) = steps.pop(0)
887            self.set_state('__steps', steps)
888
889            self.next_step_index = 0
890            ret = self._create_frame(global_control_vars, ancestry, fn_name)
891            local_vars, self.current_step_ancestry = ret
892            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
893            self._add_step_init(local_vars, fn_name)
894
895
896    def _init_group_level(self):
897        self.group_level = self.get_state("__group_level", default=0)
898
899
900    def _increment_group_level(self):
901        self.group_level += 1
902        self.set_state("__group_level", self.group_level)
903
904
905    def _decrement_group_level(self):
906        self.group_level -= 1
907        self.set_state("__group_level", self.group_level)
908
909
910    def record(self, status_code, subdir, operation, status = '',
911               optional_fields=None):
912        """
913        Record job-level status
914
915        The intent is to make this file both machine parseable and
916        human readable. That involves a little more complexity, but
917        really isn't all that bad ;-)
918
919        Format is <status code>\t<subdir>\t<operation>\t<status>
920
921        status code: (GOOD|WARN|FAIL|ABORT)
922                or   START
923                or   END (GOOD|WARN|FAIL|ABORT)
924
925        subdir: MUST be a relevant subdirectory in the results,
926        or None, which will be represented as '----'
927
928        operation: description of what you ran (e.g. "dbench", or
929                                        "mkfs -t foobar /dev/sda9")
930
931        status: error message or "completed sucessfully"
932
933        ------------------------------------------------------------
934
935        Initial tabs indicate indent levels for grouping, and is
936        governed by self.group_level
937
938        multiline messages have secondary lines prefaced by a double
939        space ('  ')
940        """
941
942        if subdir:
943            if re.match(r'[\n\t]', subdir):
944                raise ValueError("Invalid character in subdir string")
945            substr = subdir
946        else:
947            substr = '----'
948
949        if not log.is_valid_status(status_code):
950            raise ValueError("Invalid status code supplied: %s" % status_code)
951        if not operation:
952            operation = '----'
953
954        if re.match(r'[\n\t]', operation):
955            raise ValueError("Invalid character in operation string")
956        operation = operation.rstrip()
957
958        if not optional_fields:
959            optional_fields = {}
960
961        status = status.rstrip()
962        status = re.sub(r"\r", "", status)
963        status = re.sub(r"\t", "  ", status)
964        # Ensure any continuation lines are marked so we can
965        # detect them in the status file to ensure it is parsable.
966        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
967
968        # Generate timestamps for inclusion in the logs
969        epoch_time = int(time.time())  # seconds since epoch, in UTC
970        local_time = time.localtime(epoch_time)
971        optional_fields["timestamp"] = str(epoch_time)
972        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
973                                                     local_time)
974
975        fields = [status_code, substr, operation]
976        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
977        fields.append(status)
978
979        msg = '\t'.join(str(x) for x in fields)
980        msg = '\t' * self.group_level + msg
981
982        msg_tag = ""
983        if "." in self.log_filename:
984            msg_tag = self.log_filename.split(".", 1)[1]
985
986        self.harness.test_status_detail(status_code, substr, operation, status,
987                                        msg_tag)
988        self.harness.test_status(msg, msg_tag)
989
990        # log to stdout (if enabled)
991        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
992        print msg
993
994        # log to the "root" status log
995        status_file = os.path.join(self.resultdir, self.log_filename)
996        open(status_file, "a").write(msg + "\n")
997
998        # log to the subdir status log (if subdir is set)
999        if subdir:
1000            dir = os.path.join(self.resultdir, subdir)
1001            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1002            open(status_file, "a").write(msg + "\n")
1003
1004
1005class disk_usage_monitor:
1006    def __init__(self, logging_func, device, max_mb_per_hour):
1007        self.func = logging_func
1008        self.device = device
1009        self.max_mb_per_hour = max_mb_per_hour
1010
1011
1012    def start(self):
1013        self.initial_space = autotest_utils.freespace(self.device)
1014        self.start_time = time.time()
1015
1016
1017    def stop(self):
1018        # if no maximum usage rate was set, we don't need to
1019        # generate any warnings
1020        if not self.max_mb_per_hour:
1021            return
1022
1023        final_space = autotest_utils.freespace(self.device)
1024        used_space = self.initial_space - final_space
1025        stop_time = time.time()
1026        total_time = stop_time - self.start_time
1027        # round up the time to one minute, to keep extremely short
1028        # tests from generating false positives due to short, badly
1029        # timed bursts of activity
1030        total_time = max(total_time, 60.0)
1031
1032        # determine the usage rate
1033        bytes_per_sec = used_space / total_time
1034        mb_per_sec = bytes_per_sec / 1024**2
1035        mb_per_hour = mb_per_sec * 60 * 60
1036
1037        if mb_per_hour > self.max_mb_per_hour:
1038            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1039            msg %= (self.device, mb_per_hour)
1040            self.func(msg)
1041
1042
1043    @classmethod
1044    def watch(cls, *monitor_args, **monitor_dargs):
1045        """ Generic decorator to wrap a function call with the
1046        standard create-monitor -> start -> call -> stop idiom."""
1047        def decorator(func):
1048            def watched_func(*args, **dargs):
1049                monitor = cls(*monitor_args, **monitor_dargs)
1050                monitor.start()
1051                try:
1052                    func(*args, **dargs)
1053                finally:
1054                    monitor.stop()
1055            return watched_func
1056        return decorator
1057
1058
1059def runjob(control, cont = False, tag = "default", harness_type = '',
1060           use_external_logging = False):
1061    """The main interface to this module
1062
1063    control
1064            The control file to use for this job.
1065    cont
1066            Whether this is the continuation of a previously started job
1067    """
1068    control = os.path.abspath(control)
1069    state = control + '.state'
1070
1071    # instantiate the job object ready for the control file.
1072    myjob = None
1073    try:
1074        # Check that the control file is valid
1075        if not os.path.exists(control):
1076            raise error.JobError(control + ": control file not found")
1077
1078        # When continuing, the job is complete when there is no
1079        # state file, ensure we don't try and continue.
1080        if cont and not os.path.exists(state):
1081            raise error.JobComplete("all done")
1082        if cont == False and os.path.exists(state):
1083            os.unlink(state)
1084
1085        myjob = job(control, tag, cont, harness_type, use_external_logging)
1086
1087        # Load in the users control file, may do any one of:
1088        #  1) execute in toto
1089        #  2) define steps, and select the first via next_step()
1090        myjob.step_engine()
1091
1092    except error.JobContinue:
1093        sys.exit(5)
1094
1095    except error.JobComplete:
1096        sys.exit(1)
1097
1098    except error.JobError, instance:
1099        print "JOB ERROR: " + instance.args[0]
1100        if myjob:
1101            command = None
1102            if len(instance.args) > 1:
1103                command = instance.args[1]
1104                myjob.record('ABORT', None, command, instance.args[0])
1105            myjob._decrement_group_level()
1106            myjob.record('END ABORT', None, None, instance.args[0])
1107            assert(myjob.group_level == 0)
1108            myjob.complete(1)
1109        else:
1110            sys.exit(1)
1111
1112    except Exception, e:
1113        msg = str(e) + '\n' + traceback.format_exc()
1114        print "JOB ERROR: " + msg
1115        if myjob:
1116            myjob._decrement_group_level()
1117            myjob.record('END ABORT', None, None, msg)
1118            assert(myjob.group_level == 0)
1119            myjob.complete(1)
1120        else:
1121            sys.exit(1)
1122
1123    # If we get here, then we assume the job is complete and good.
1124    myjob._decrement_group_level()
1125    myjob.record('END GOOD', None, None)
1126    assert(myjob.group_level == 0)
1127
1128    myjob.complete(0)
1129
1130
1131# site_job.py may be non-existant or empty, make sure that an appropriate
1132# site_job class is created nevertheless
1133try:
1134    from autotest_lib.client.bin.site_job import site_job
1135except ImportError:
1136    class site_job(object):
1137        pass
1138
1139class job(site_job, base_job):
1140    pass
1141