job.py revision c1f8cedaa425fa57813e4e8ce6db2b84676816ba
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback, types, copy
10
11# autotest stuff
12from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen
13from autotest_lib.client.bin import profilers, fd_stack, boottool, harness
14from autotest_lib.client.bin import config, sysinfo, cpuset, test, filesystem
15from autotest_lib.client.common_lib import error, barrier, logging, utils
16
17JOB_PREAMBLE = """
18from autotest_lib.client.common_lib.error import *
19from autotest_lib.client.bin.autotest_utils import *
20"""
21
22class StepError(error.AutotestError):
23    pass
24
25
26class base_job(object):
27    """The actual job against which we do everything.
28
29    Properties:
30            autodir
31                    The top level autotest directory (/usr/local/autotest).
32                    Comes from os.environ['AUTODIR'].
33            bindir
34                    <autodir>/bin/
35            libdir
36                    <autodir>/lib/
37            testdir
38                    <autodir>/tests/
39            site_testdir
40                    <autodir>/site_tests/
41            profdir
42                    <autodir>/profilers/
43            tmpdir
44                    <autodir>/tmp/
45            resultdir
46                    <autodir>/results/<jobtag>
47            stdout
48                    fd_stack object for stdout
49            stderr
50                    fd_stack object for stderr
51            profilers
52                    the profilers object for this job
53            harness
54                    the server harness object for this job
55            config
56                    the job configuration for this job
57    """
58
59    DEFAULT_LOG_FILENAME = "status"
60
61    def __init__(self, control, jobtag, cont, harness_type=None,
62                    use_external_logging = False):
63        """
64                control
65                        The control file (pathname of)
66                jobtag
67                        The job tag string (eg "default")
68                cont
69                        If this is the continuation of this job
70                harness_type
71                        An alternative server harness
72        """
73        self.autodir = os.environ['AUTODIR']
74        self.bindir = os.path.join(self.autodir, 'bin')
75        self.libdir = os.path.join(self.autodir, 'lib')
76        self.testdir = os.path.join(self.autodir, 'tests')
77        self.site_testdir = os.path.join(self.autodir, 'site_tests')
78        self.profdir = os.path.join(self.autodir, 'profilers')
79        self.tmpdir = os.path.join(self.autodir, 'tmp')
80        self.resultdir = os.path.join(self.autodir, 'results', jobtag)
81        self.sysinfodir = os.path.join(self.resultdir, 'sysinfo')
82        self.control = os.path.abspath(control)
83        self.state_file = self.control + '.state'
84        self.current_step_ancestry = []
85        self.next_step_index = 0
86        self._load_state()
87
88        if not cont:
89            """
90            Don't cleanup the tmp dir (which contains the lockfile)
91            in the constructor, this would be a problem for multiple
92            jobs starting at the same time on the same client. Instead
93            do the delete at the server side. We simply create the tmp
94            directory here if it does not already exist.
95            """
96            if not os.path.exists(self.tmpdir):
97                os.mkdir(self.tmpdir)
98
99            results = os.path.join(self.autodir, 'results')
100            if not os.path.exists(results):
101                os.mkdir(results)
102
103            download = os.path.join(self.testdir, 'download')
104            if not os.path.exists(download):
105                os.mkdir(download)
106
107            if os.path.exists(self.resultdir):
108                utils.system('rm -rf ' + self.resultdir)
109            os.mkdir(self.resultdir)
110            os.mkdir(self.sysinfodir)
111
112            os.mkdir(os.path.join(self.resultdir, 'debug'))
113            os.mkdir(os.path.join(self.resultdir, 'analysis'))
114
115            shutil.copyfile(self.control,
116                            os.path.join(self.resultdir, 'control'))
117
118
119        self.control = control
120        self.jobtag = jobtag
121        self.log_filename = self.DEFAULT_LOG_FILENAME
122        self.container = None
123
124        self.stdout = fd_stack.fd_stack(1, sys.stdout)
125        self.stderr = fd_stack.fd_stack(2, sys.stderr)
126
127        self._init_group_level()
128
129        self.config = config.config(self)
130        self.harness = harness.select(harness_type, self)
131        self.profilers = profilers.profilers(self)
132
133        try:
134            tool = self.config_get('boottool.executable')
135            self.bootloader = boottool.boottool(tool)
136        except:
137            pass
138
139        sysinfo.log_per_reboot_data(self.sysinfodir)
140
141        if not cont:
142            self.record('START', None, None)
143            self._increment_group_level()
144
145        self.harness.run_start()
146
147        if use_external_logging:
148            self.enable_external_logging()
149
150        # load the max disk usage rate - default to no monitoring
151        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
152
153
154    def monitor_disk_usage(self, max_rate):
155        """\
156        Signal that the job should monitor disk space usage on /
157        and generate a warning if a test uses up disk space at a
158        rate exceeding 'max_rate'.
159
160        Parameters:
161             max_rate - the maximium allowed rate of disk consumption
162                        during a test, in MB/hour, or 0 to indicate
163                        no limit.
164        """
165        self.set_state('__monitor_disk', max_rate)
166        self.max_disk_usage_rate = max_rate
167
168
169    def relative_path(self, path):
170        """\
171        Return a patch relative to the job results directory
172        """
173        head = len(self.resultdir) + 1     # remove the / inbetween
174        return path[head:]
175
176
177    def control_get(self):
178        return self.control
179
180
181    def control_set(self, control):
182        self.control = os.path.abspath(control)
183
184
185    def harness_select(self, which):
186        self.harness = harness.select(which, self)
187
188
189    def config_set(self, name, value):
190        self.config.set(name, value)
191
192
193    def config_get(self, name):
194        return self.config.get(name)
195
196
197    def setup_dirs(self, results_dir, tmp_dir):
198        if not tmp_dir:
199            tmp_dir = os.path.join(self.tmpdir, 'build')
200        if not os.path.exists(tmp_dir):
201            os.mkdir(tmp_dir)
202        if not os.path.isdir(tmp_dir):
203            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
204            raise ValueError(e_msg)
205
206        # We label the first build "build" and then subsequent ones
207        # as "build.2", "build.3", etc. Whilst this is a little bit
208        # inconsistent, 99.9% of jobs will only have one build
209        # (that's not done as kernbench, sparse, or buildtest),
210        # so it works out much cleaner. One of life's comprimises.
211        if not results_dir:
212            results_dir = os.path.join(self.resultdir, 'build')
213            i = 2
214            while os.path.exists(results_dir):
215                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
216                i += 1
217        if not os.path.exists(results_dir):
218            os.mkdir(results_dir)
219
220        return (results_dir, tmp_dir)
221
222
223    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
224                            kjob = None ):
225        """Summon a xen object"""
226        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
227        build_dir = 'xen'
228        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
229
230
231    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
232        """Summon a kernel object"""
233        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
234        build_dir = 'linux'
235        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
236                                  build_dir, leave)
237
238
239    def barrier(self, *args, **kwds):
240        """Create a barrier object"""
241        return barrier.barrier(*args, **kwds)
242
243
244    def setup_dep(self, deps):
245        """Set up the dependencies for this test.
246
247        deps is a list of libraries required for this test.
248        """
249        for dep in deps:
250            try:
251                os.chdir(os.path.join(self.autodir, 'deps', dep))
252                utils.system('./' + dep + '.py')
253            except:
254                err = "setting up dependency " + dep + "\n"
255                raise error.UnhandledError(err)
256
257
258    def _runtest(self, url, tag, args, dargs):
259        try:
260            l = lambda : test.runtest(self, url, tag, args, dargs)
261            pid = parallel.fork_start(self.resultdir, l)
262            parallel.fork_waitfor(self.resultdir, pid)
263        except error.AutotestError:
264            raise
265        except Exception, e:
266            msg = "Unhandled %s error occured during test\n"
267            msg %= str(e.__class__.__name__)
268            raise error.UnhandledError(msg)
269
270
271    def run_test(self, url, *args, **dargs):
272        """Summon a test object and run it.
273
274        tag
275                tag to add to testname
276        url
277                url of the test to run
278        """
279
280        if not url:
281            raise TypeError("Test name is invalid. "
282                            "Switched arguments?")
283        (group, testname) = test.testname(url)
284        namelen = len(testname)
285        dargs = dargs.copy()
286        tntag = dargs.pop('tag', None)
287        if tntag:  # testname tag is included in reported test name
288            testname += '.' + tntag
289        subdir = testname
290        sdtag = dargs.pop('subdir_tag', None)
291        if sdtag:  # subdir-only tag is not included in reports
292            subdir = subdir + '.' + sdtag
293        tag = subdir[namelen+1:]    # '' if none
294
295        outputdir = os.path.join(self.resultdir, subdir)
296        if os.path.exists(outputdir):
297            msg = ("%s already exists, test <%s> may have"
298                    " already run with tag <%s>"
299                    % (outputdir, testname, tag) )
300            raise error.TestError(msg)
301        os.mkdir(outputdir)
302
303        container = dargs.pop('container', None)
304        if container:
305            cname = container.get('name', None)
306            if not cname:   # get old name
307                cname = container.get('container_name', None)
308            mbytes = container.get('mbytes', None)
309            if not mbytes:  # get old name
310                mbytes = container.get('mem', None)
311            cpus  = container.get('cpus', None)
312            if not cpus:    # get old name
313                cpus  = container.get('cpu', None)
314            root  = container.get('root', None)
315            self.new_container(mbytes=mbytes, cpus=cpus,
316                            root=root, name=cname)
317            # We are running in a container now...
318
319        def log_warning(reason):
320            self.record("WARN", subdir, testname, reason)
321        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
322        def group_func():
323            try:
324                self._runtest(url, tag, args, dargs)
325            except error.TestNAError, detail:
326                self.record('TEST_NA', subdir, testname,
327                            str(detail))
328                raise
329            except Exception, detail:
330                self.record('FAIL', subdir, testname,
331                            str(detail))
332                raise
333            else:
334                self.record('GOOD', subdir, testname,
335                            'completed successfully')
336
337        result, exc_info = self._rungroup(subdir, testname, group_func)
338        if container:
339            self.release_container()
340        if exc_info and isinstance(exc_info[1], error.TestError):
341            return False
342        elif exc_info:
343            raise exc_info[0], exc_info[1], exc_info[2]
344        else:
345            return True
346
347
348    def _rungroup(self, subdir, testname, function, *args, **dargs):
349        """\
350        subdir:
351                name of the group
352        testname:
353                name of the test to run, or support step
354        function:
355                subroutine to run
356        *args:
357                arguments for the function
358
359        Returns a 2-tuple (result, exc_info) where result
360        is the return value of function, and exc_info is
361        the sys.exc_info() of the exception thrown by the
362        function (which may be None).
363        """
364
365        result, exc_info = None, None
366        try:
367            self.record('START', subdir, testname)
368            self._increment_group_level()
369            result = function(*args, **dargs)
370            self._decrement_group_level()
371            self.record('END GOOD', subdir, testname)
372        except error.TestNAError, e:
373            self._decrement_group_level()
374            self.record('END TEST_NA', subdir, testname, str(e))
375        except Exception, e:
376            exc_info = sys.exc_info()
377            self._decrement_group_level()
378            err_msg = str(e) + '\n' + traceback.format_exc()
379            self.record('END FAIL', subdir, testname, err_msg)
380
381        return result, exc_info
382
383
384    def run_group(self, function, *args, **dargs):
385        """\
386        function:
387                subroutine to run
388        *args:
389                arguments for the function
390        """
391
392        # Allow the tag for the group to be specified
393        name = function.__name__
394        tag = dargs.pop('tag', None)
395        if tag:
396            name = tag
397
398        outputdir = os.path.join(self.resultdir, name)
399        if os.path.exists(outputdir):
400            msg = ("%s already exists, test <%s> may have"
401                    " already run with tag <%s>"
402                    % (outputdir, name, name) )
403            raise error.TestError(msg)
404        os.mkdir(outputdir)
405
406        result, exc_info = self._rungroup(name, name, function, *args, **dargs)
407
408        # if there was a non-TestError exception, raise it
409        if exc_info and not isinstance(exc_info[1], error.TestError):
410            err = ''.join(traceback.format_exception(*exc_info))
411            raise error.TestError(name + ' failed\n' + err)
412
413        # pass back the actual return value from the function
414        return result
415
416
417    def new_container(self, mbytes=None, cpus=None, root=None, name=None):
418        if not autotest_utils.grep('cpuset', '/proc/filesystems'):
419            print "Containers not enabled by latest reboot"
420            return  # containers weren't enabled in this kernel boot
421        pid = os.getpid()
422        if not name:
423            name = 'test%d' % pid  # make arbitrary unique name
424        self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid,
425                                       cpus=cpus, root=root)
426        # This job's python shell is now running in the new container
427        # and all forked test processes will inherit that container
428
429
430    def release_container(self):
431        if self.container:
432            self.container.release()
433            self.container = None
434
435
436    def cpu_count(self):
437        if self.container:
438            return len(self.container.cpus)
439        return autotest_utils.count_cpus()  # use total system count
440
441
442    # Check the passed kernel identifier against the command line
443    # and the running kernel, abort the job on missmatch.
444    def kernel_check_ident(self, expected_when, expected_id, subdir,
445                           type = 'src', patches=[]):
446        print (("POST BOOT: checking booted kernel " +
447                "mark=%d identity='%s' type='%s'") %
448               (expected_when, expected_id, type))
449
450        running_id = autotest_utils.running_os_ident()
451
452        cmdline = utils.read_one_line("/proc/cmdline")
453
454        find_sum = re.compile(r'.*IDENT=(\d+)')
455        m = find_sum.match(cmdline)
456        cmdline_when = -1
457        if m:
458            cmdline_when = int(m.groups()[0])
459
460        # We have all the facts, see if they indicate we
461        # booted the requested kernel or not.
462        bad = False
463        if (type == 'src' and expected_id != running_id or
464            type == 'rpm' and
465            not running_id.startswith(expected_id + '::')):
466            print "check_kernel_ident: kernel identifier mismatch"
467            bad = True
468        if expected_when != cmdline_when:
469            print "check_kernel_ident: kernel command line mismatch"
470            bad = True
471
472        if bad:
473            print "   Expected Ident: " + expected_id
474            print "    Running Ident: " + running_id
475            print "    Expected Mark: %d" % (expected_when)
476            print "Command Line Mark: %d" % (cmdline_when)
477            print "     Command Line: " + cmdline
478
479            raise error.JobError("boot failure", "reboot.verify")
480
481        kernel_info = {'kernel': expected_id}
482        for i, patch in enumerate(patches):
483            kernel_info["patch%d" % i] = patch
484        self.record('GOOD', subdir, 'reboot.verify', expected_id)
485        self._decrement_group_level()
486        self.record('END GOOD', subdir, 'reboot',
487                    optional_fields=kernel_info)
488
489
490    def filesystem(self, device, mountpoint = None, loop_size = 0):
491        if not mountpoint:
492            mountpoint = self.tmpdir
493        return filesystem.filesystem(self, device, mountpoint,loop_size)
494
495
496    def enable_external_logging(self):
497        pass
498
499
500    def disable_external_logging(self):
501        pass
502
503
504    def reboot_setup(self):
505        pass
506
507
508    def reboot(self, tag='autotest'):
509        self.reboot_setup()
510        self.record('START', None, 'reboot')
511        self._increment_group_level()
512        self.record('GOOD', None, 'reboot.start')
513        self.harness.run_reboot()
514        default = self.config_get('boot.set_default')
515        if default:
516            self.bootloader.set_default(tag)
517        else:
518            self.bootloader.boot_once(tag)
519        cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
520        utils.system(cmd)
521        self.quit()
522
523
524    def noop(self, text):
525        print "job: noop: " + text
526
527
528    def parallel(self, *tasklist):
529        """Run tasks in parallel"""
530
531        pids = []
532        old_log_filename = self.log_filename
533        for i, task in enumerate(tasklist):
534            self.log_filename = old_log_filename + (".%d" % i)
535            task_func = lambda: task[0](*task[1:])
536            pids.append(parallel.fork_start(self.resultdir, task_func))
537
538        old_log_path = os.path.join(self.resultdir, old_log_filename)
539        old_log = open(old_log_path, "a")
540        exceptions = []
541        for i, pid in enumerate(pids):
542            # wait for the task to finish
543            try:
544                parallel.fork_waitfor(self.resultdir, pid)
545            except Exception, e:
546                exceptions.append(e)
547            # copy the logs from the subtask into the main log
548            new_log_path = old_log_path + (".%d" % i)
549            if os.path.exists(new_log_path):
550                new_log = open(new_log_path)
551                old_log.write(new_log.read())
552                new_log.close()
553                old_log.flush()
554                os.remove(new_log_path)
555        old_log.close()
556
557        self.log_filename = old_log_filename
558
559        # handle any exceptions raised by the parallel tasks
560        if exceptions:
561            msg = "%d task(s) failed" % len(exceptions)
562            raise error.JobError(msg, str(exceptions), exceptions)
563
564
565    def quit(self):
566        # XXX: should have a better name.
567        self.harness.run_pause()
568        raise error.JobContinue("more to come")
569
570
571    def complete(self, status):
572        """Clean up and exit"""
573        # We are about to exit 'complete' so clean up the control file.
574        try:
575            os.unlink(self.state_file)
576        except:
577            pass
578
579        self.harness.run_complete()
580        self.disable_external_logging()
581        sys.exit(status)
582
583
584    def set_state(self, var, val):
585        # Deep copies make sure that the state can't be altered
586        # without it being re-written.  Perf wise, deep copies
587        # are overshadowed by pickling/loading.
588        self.state[var] = copy.deepcopy(val)
589        pickle.dump(self.state, open(self.state_file, 'w'))
590
591
592    def _load_state(self):
593        assert not hasattr(self, "state")
594        try:
595            self.state = pickle.load(open(self.state_file, 'r'))
596            self.state_existed = True
597        except Exception:
598            print "Initializing the state engine."
599            self.state = {}
600            self.set_state('__steps', []) # writes pickle file
601            self.state_existed = False
602
603
604    def get_state(self, var, default=None):
605        if var in self.state or default == None:
606            val = self.state[var]
607        else:
608            val = default
609        return copy.deepcopy(val)
610
611
612    def __create_step_tuple(self, fn, args, dargs):
613        # Legacy code passes in an array where the first arg is
614        # the function or its name.
615        if isinstance(fn, list):
616            assert(len(args) == 0)
617            assert(len(dargs) == 0)
618            args = fn[1:]
619            fn = fn[0]
620        # Pickling actual functions is harry, thus we have to call
621        # them by name.  Unfortunately, this means only functions
622        # defined globally can be used as a next step.
623        if callable(fn):
624            fn = fn.__name__
625        if not isinstance(fn, types.StringTypes):
626            raise StepError("Next steps must be functions or "
627                            "strings containing the function name")
628        ancestry = copy.copy(self.current_step_ancestry)
629        return (ancestry, fn, args, dargs)
630
631
632    def next_step_append(self, fn, *args, **dargs):
633        """Define the next step and place it at the end"""
634        steps = self.get_state('__steps')
635        steps.append(self.__create_step_tuple(fn, args, dargs))
636        self.set_state('__steps', steps)
637
638
639    def next_step(self, fn, *args, **dargs):
640        """Create a new step and place it after any steps added
641        while running the current step but before any steps added in
642        previous steps"""
643        steps = self.get_state('__steps')
644        steps.insert(self.next_step_index,
645                     self.__create_step_tuple(fn, args, dargs))
646        self.next_step_index += 1
647        self.set_state('__steps', steps)
648
649
650    def next_step_prepend(self, fn, *args, **dargs):
651        """Insert a new step, executing first"""
652        steps = self.get_state('__steps')
653        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
654        self.next_step_index += 1
655        self.set_state('__steps', steps)
656
657
658    def _run_step_fn(self, local_vars, fn, args, dargs):
659        """Run a (step) function within the given context"""
660
661        local_vars['__args'] = args
662        local_vars['__dargs'] = dargs
663        exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
664        return local_vars['__ret']
665
666
667    def _create_frame(self, global_vars, ancestry, fn_name):
668        """Set up the environment like it would have been when this
669        function was first defined.
670
671        Child step engine 'implementations' must have 'return locals()'
672        at end end of their steps.  Because of this, we can call the
673        parent function and get back all child functions (i.e. those
674        defined within it).
675
676        Unfortunately, the call stack of the function calling
677        job.next_step might have been deeper than the function it
678        added.  In order to make sure that the environment is what it
679        should be, we need to then pop off the frames we built until
680        we find the frame where the function was first defined."""
681
682        # The copies ensure that the parent frames are not modified
683        # while building child frames.  This matters if we then
684        # pop some frames in the next part of this function.
685        current_frame = copy.copy(global_vars)
686        frames = [current_frame]
687        for steps_fn_name in ancestry:
688            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
689            current_frame = copy.copy(ret)
690            frames.append(current_frame)
691
692        while len(frames) > 2:
693            if fn_name not in frames[-2]:
694                break
695            if frames[-2][fn_name] != frames[-1][fn_name]:
696                break
697            frames.pop()
698            ancestry.pop()
699
700        return (frames[-1], ancestry)
701
702
703    def _add_step_init(self, local_vars, current_function):
704        """If the function returned a dictionary that includes a
705        function named 'step_init', prepend it to our list of steps.
706        This will only get run the first time a function with a nested
707        use of the step engine is run."""
708
709        if (isinstance(local_vars, dict) and
710            'step_init' in local_vars and
711            callable(local_vars['step_init'])):
712            # The init step is a child of the function
713            # we were just running.
714            self.current_step_ancestry.append(current_function)
715            self.next_step_prepend('step_init')
716
717
718    def step_engine(self):
719        """the stepping engine -- if the control file defines
720        step_init we will be using this engine to drive multiple runs.
721        """
722        """Do the next step"""
723
724        # Set up the environment and then interpret the control file.
725        # Some control files will have code outside of functions,
726        # which means we need to have our state engine initialized
727        # before reading in the file.
728        global_control_vars = {'job': self}
729        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
730        execfile(self.control, global_control_vars, global_control_vars)
731
732        # If we loaded in a mid-job state file, then we presumably
733        # know what steps we have yet to run.
734        if not self.state_existed:
735            if global_control_vars.has_key('step_init'):
736                self.next_step(global_control_vars['step_init'])
737
738        # Iterate through the steps.  If we reboot, we'll simply
739        # continue iterating on the next step.
740        while len(self.get_state('__steps')) > 0:
741            steps = self.get_state('__steps')
742            (ancestry, fn_name, args, dargs) = steps.pop(0)
743            self.set_state('__steps', steps)
744
745            self.next_step_index = 0
746            ret = self._create_frame(global_control_vars, ancestry, fn_name)
747            local_vars, self.current_step_ancestry = ret
748            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
749            self._add_step_init(local_vars, fn_name)
750
751
752    def _init_group_level(self):
753        self.group_level = self.get_state("__group_level", default=0)
754
755
756    def _increment_group_level(self):
757        self.group_level += 1
758        self.set_state("__group_level", self.group_level)
759
760
761    def _decrement_group_level(self):
762        self.group_level -= 1
763        self.set_state("__group_level", self.group_level)
764
765
766    def record(self, status_code, subdir, operation, status = '',
767               optional_fields=None):
768        """
769        Record job-level status
770
771        The intent is to make this file both machine parseable and
772        human readable. That involves a little more complexity, but
773        really isn't all that bad ;-)
774
775        Format is <status code>\t<subdir>\t<operation>\t<status>
776
777        status code: (GOOD|WARN|FAIL|ABORT)
778                or   START
779                or   END (GOOD|WARN|FAIL|ABORT)
780
781        subdir: MUST be a relevant subdirectory in the results,
782        or None, which will be represented as '----'
783
784        operation: description of what you ran (e.g. "dbench", or
785                                        "mkfs -t foobar /dev/sda9")
786
787        status: error message or "completed sucessfully"
788
789        ------------------------------------------------------------
790
791        Initial tabs indicate indent levels for grouping, and is
792        governed by self.group_level
793
794        multiline messages have secondary lines prefaced by a double
795        space ('  ')
796        """
797
798        if subdir:
799            if re.match(r'[\n\t]', subdir):
800                raise ValueError("Invalid character in subdir string")
801            substr = subdir
802        else:
803            substr = '----'
804
805        if not logging.is_valid_status(status_code):
806            raise ValueError("Invalid status code supplied: %s" % status_code)
807        if not operation:
808            operation = '----'
809
810        if re.match(r'[\n\t]', operation):
811            raise ValueError("Invalid character in operation string")
812        operation = operation.rstrip()
813
814        if not optional_fields:
815            optional_fields = {}
816
817        status = status.rstrip()
818        status = re.sub(r"\t", "  ", status)
819        # Ensure any continuation lines are marked so we can
820        # detect them in the status file to ensure it is parsable.
821        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
822
823        # Generate timestamps for inclusion in the logs
824        epoch_time = int(time.time())  # seconds since epoch, in UTC
825        local_time = time.localtime(epoch_time)
826        optional_fields["timestamp"] = str(epoch_time)
827        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
828                                                     local_time)
829
830        fields = [status_code, substr, operation]
831        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
832        fields.append(status)
833
834        msg = '\t'.join(str(x) for x in fields)
835        msg = '\t' * self.group_level + msg
836
837        msg_tag = ""
838        if "." in self.log_filename:
839            msg_tag = self.log_filename.split(".", 1)[1]
840
841        self.harness.test_status_detail(status_code, substr, operation, status,
842                                        msg_tag)
843        self.harness.test_status(msg, msg_tag)
844
845        # log to stdout (if enabled)
846        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
847        print msg
848
849        # log to the "root" status log
850        status_file = os.path.join(self.resultdir, self.log_filename)
851        open(status_file, "a").write(msg + "\n")
852
853        # log to the subdir status log (if subdir is set)
854        if subdir:
855            dir = os.path.join(self.resultdir, subdir)
856            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
857            open(status_file, "a").write(msg + "\n")
858
859
860class disk_usage_monitor:
861    def __init__(self, logging_func, device, max_mb_per_hour):
862        self.func = logging_func
863        self.device = device
864        self.max_mb_per_hour = max_mb_per_hour
865
866
867    def start(self):
868        self.initial_space = autotest_utils.freespace(self.device)
869        self.start_time = time.time()
870
871
872    def stop(self):
873        # if no maximum usage rate was set, we don't need to
874        # generate any warnings
875        if not self.max_mb_per_hour:
876            return
877
878        final_space = autotest_utils.freespace(self.device)
879        used_space = self.initial_space - final_space
880        stop_time = time.time()
881        total_time = stop_time - self.start_time
882        # round up the time to one minute, to keep extremely short
883        # tests from generating false positives due to short, badly
884        # timed bursts of activity
885        total_time = max(total_time, 60.0)
886
887        # determine the usage rate
888        bytes_per_sec = used_space / total_time
889        mb_per_sec = bytes_per_sec / 1024**2
890        mb_per_hour = mb_per_sec * 60 * 60
891
892        if mb_per_hour > self.max_mb_per_hour:
893            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
894            msg %= (self.device, mb_per_hour)
895            self.func(msg)
896
897
898    @classmethod
899    def watch(cls, *monitor_args, **monitor_dargs):
900        """ Generic decorator to wrap a function call with the
901        standard create-monitor -> start -> call -> stop idiom."""
902        def decorator(func):
903            def watched_func(*args, **dargs):
904                monitor = cls(*monitor_args, **monitor_dargs)
905                monitor.start()
906                try:
907                    func(*args, **dargs)
908                finally:
909                    monitor.stop()
910            return watched_func
911        return decorator
912
913
914def runjob(control, cont = False, tag = "default", harness_type = '',
915           use_external_logging = False):
916    """The main interface to this module
917
918    control
919            The control file to use for this job.
920    cont
921            Whether this is the continuation of a previously started job
922    """
923    control = os.path.abspath(control)
924    state = control + '.state'
925
926    # instantiate the job object ready for the control file.
927    myjob = None
928    try:
929        # Check that the control file is valid
930        if not os.path.exists(control):
931            raise error.JobError(control + ": control file not found")
932
933        # When continuing, the job is complete when there is no
934        # state file, ensure we don't try and continue.
935        if cont and not os.path.exists(state):
936            raise error.JobComplete("all done")
937        if cont == False and os.path.exists(state):
938            os.unlink(state)
939
940        myjob = job(control, tag, cont, harness_type, use_external_logging)
941
942        # Load in the users control file, may do any one of:
943        #  1) execute in toto
944        #  2) define steps, and select the first via next_step()
945        myjob.step_engine()
946
947    except error.JobContinue:
948        sys.exit(5)
949
950    except error.JobComplete:
951        sys.exit(1)
952
953    except error.JobError, instance:
954        print "JOB ERROR: " + instance.args[0]
955        if myjob:
956            command = None
957            if len(instance.args) > 1:
958                command = instance.args[1]
959            myjob.record('ABORT', None, command, instance.args[0])
960            myjob._decrement_group_level()
961            myjob.record('END ABORT', None, None)
962            assert(myjob.group_level == 0)
963            myjob.complete(1)
964        else:
965            sys.exit(1)
966
967    except Exception, e:
968        msg = str(e) + '\n' + traceback.format_exc()
969        print "JOB ERROR: " + msg
970        if myjob:
971            myjob.record('ABORT', None, None, msg)
972            myjob._decrement_group_level()
973            myjob.record('END ABORT', None, None)
974            assert(myjob.group_level == 0)
975            myjob.complete(1)
976        else:
977            sys.exit(1)
978
979    # If we get here, then we assume the job is complete and good.
980    myjob._decrement_group_level()
981    myjob.record('END GOOD', None, None)
982    assert(myjob.group_level == 0)
983
984    myjob.complete(0)
985
986
987# site_job.py may be non-existant or empty, make sure that an appropriate
988# site_job class is created nevertheless
989try:
990    from site_job import site_job
991except ImportError:
992    class site_job(base_job):
993        pass
994
995class job(site_job):
996    pass
997