job.py revision a700772e70548b10721ba709c7ec5194bb6d7597
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types
9import cPickle as pickle
10from autotest_lib.client.bin import utils, parallel, kernel, xen
11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness
12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition
13from autotest_lib.client.common_lib import error, barrier, log
14from autotest_lib.client.common_lib import packages, debug
15
16LAST_BOOT_TAG = object()
17NO_DEFAULT = object()
18JOB_PREAMBLE = """
19from autotest_lib.client.common_lib.error import *
20from autotest_lib.client.bin.utils import *
21"""
22
23
24class StepError(error.AutotestError):
25    pass
26
27class NotAvailableError(error.AutotestError):
28    pass
29
30
31
32def _run_test_complete_on_exit(f):
33    """Decorator for job methods that automatically calls
34    self.harness.run_test_complete when the method exits, if appropriate."""
35    def wrapped(self, *args, **dargs):
36        try:
37            return f(self, *args, **dargs)
38        finally:
39            if self.log_filename == self.DEFAULT_LOG_FILENAME:
40                self.harness.run_test_complete()
41                if self.drop_caches:
42                    print "Dropping caches"
43                    utils.drop_caches()
44    wrapped.__name__ = f.__name__
45    wrapped.__doc__ = f.__doc__
46    wrapped.__dict__.update(f.__dict__)
47    return wrapped
48
49
50class base_job(object):
51    """The actual job against which we do everything.
52
53    Properties:
54            autodir
55                    The top level autotest directory (/usr/local/autotest).
56                    Comes from os.environ['AUTODIR'].
57            bindir
58                    <autodir>/bin/
59            libdir
60                    <autodir>/lib/
61            testdir
62                    <autodir>/tests/
63            configdir
64                    <autodir>/config/
65            site_testdir
66                    <autodir>/site_tests/
67            profdir
68                    <autodir>/profilers/
69            tmpdir
70                    <autodir>/tmp/
71            pkgdir
72                    <autodir>/packages/
73            resultdir
74                    <autodir>/results/<jobtag>
75            toolsdir
76                    <autodir>/tools/
77            stdout
78                    fd_stack object for stdout
79            stderr
80                    fd_stack object for stderr
81            profilers
82                    the profilers object for this job
83            harness
84                    the server harness object for this job
85            config
86                    the job configuration for this job
87            drop_caches_between_iterations
88                    drop the pagecache between each iteration
89    """
90
91    DEFAULT_LOG_FILENAME = "status"
92
93    def __init__(self, control, jobtag, cont, harness_type=None,
94                 use_external_logging=False, drop_caches=True):
95        """
96        Prepare a client side job object.
97
98        Args:
99          control: The control file (pathname of).
100          jobtag: The job tag string (eg "default").
101          cont: If this is the continuation of this job.
102          harness_type: An alternative server harness.  [None]
103          use_external_logging: If true, the enable_external_logging
104                  method will be called during construction.  [False]
105          drop_caches: If true, utils.drop_caches() is
106                  called before and between all tests.  [True]
107        """
108        self.drop_caches = drop_caches
109        if self.drop_caches:
110            print "Dropping caches"
111            utils.drop_caches()
112        self.drop_caches_between_iterations = False
113        self.autodir = os.environ['AUTODIR']
114        self.bindir = os.path.join(self.autodir, 'bin')
115        self.libdir = os.path.join(self.autodir, 'lib')
116        self.testdir = os.path.join(self.autodir, 'tests')
117        self.configdir = os.path.join(self.autodir, 'config')
118        self.site_testdir = os.path.join(self.autodir, 'site_tests')
119        self.profdir = os.path.join(self.autodir, 'profilers')
120        self.tmpdir = os.path.join(self.autodir, 'tmp')
121        self.toolsdir = os.path.join(self.autodir, 'tools')
122        self.resultdir = os.path.join(self.autodir, 'results', jobtag)
123
124        self.control = os.path.realpath(control)
125        self._is_continuation = cont
126        self.state_file = self.control + '.state'
127        self.current_step_ancestry = []
128        self.next_step_index = 0
129        self.testtag = ''
130        self._test_tag_prefix = ''
131
132        self._load_state()
133        self.pkgmgr = packages.PackageManager(
134            self.autodir, run_function_dargs={'timeout':1800})
135        self.pkgdir = os.path.join(self.autodir, 'packages')
136        self.run_test_cleanup = self.get_state("__run_test_cleanup",
137                                                default=True)
138
139        self.sysinfo = sysinfo.sysinfo(self.resultdir)
140        self._load_sysinfo_state()
141
142        self.last_boot_tag = self.get_state("__last_boot_tag", default=None)
143        self.job_log = debug.get_logger(module='client')
144
145        if not cont:
146            """
147            Don't cleanup the tmp dir (which contains the lockfile)
148            in the constructor, this would be a problem for multiple
149            jobs starting at the same time on the same client. Instead
150            do the delete at the server side. We simply create the tmp
151            directory here if it does not already exist.
152            """
153            if not os.path.exists(self.tmpdir):
154                os.mkdir(self.tmpdir)
155
156            if not os.path.exists(self.pkgdir):
157                os.mkdir(self.pkgdir)
158
159            results = os.path.join(self.autodir, 'results')
160            if not os.path.exists(results):
161                os.mkdir(results)
162
163            download = os.path.join(self.testdir, 'download')
164            if not os.path.exists(download):
165                os.mkdir(download)
166
167            if os.path.exists(self.resultdir):
168                utils.system('rm -rf ' + self.resultdir)
169            os.mkdir(self.resultdir)
170
171            os.mkdir(os.path.join(self.resultdir, 'debug'))
172            os.mkdir(os.path.join(self.resultdir, 'analysis'))
173
174            shutil.copyfile(self.control,
175                            os.path.join(self.resultdir, 'control'))
176
177
178        self.control = control
179        self.jobtag = jobtag
180        self.log_filename = self.DEFAULT_LOG_FILENAME
181        self.container = None
182
183        self.stdout = fd_stack.fd_stack(1, sys.stdout)
184        self.stderr = fd_stack.fd_stack(2, sys.stderr)
185
186        self._init_group_level()
187
188        self.config = config.config(self)
189        self.harness = harness.select(harness_type, self)
190        self.profilers = profilers.profilers(self)
191
192        try:
193            tool = self.config_get('boottool.executable')
194            self.bootloader = boottool.boottool(tool)
195        except:
196            pass
197
198        self.sysinfo.log_per_reboot_data()
199
200        if not cont:
201            self.record('START', None, None)
202            self._increment_group_level()
203
204        self.harness.run_start()
205
206        if use_external_logging:
207            self.enable_external_logging()
208
209        # load the max disk usage rate - default to no monitoring
210        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
211
212
213    def monitor_disk_usage(self, max_rate):
214        """\
215        Signal that the job should monitor disk space usage on /
216        and generate a warning if a test uses up disk space at a
217        rate exceeding 'max_rate'.
218
219        Parameters:
220             max_rate - the maximium allowed rate of disk consumption
221                        during a test, in MB/hour, or 0 to indicate
222                        no limit.
223        """
224        self.set_state('__monitor_disk', max_rate)
225        self.max_disk_usage_rate = max_rate
226
227
228    def relative_path(self, path):
229        """\
230        Return a patch relative to the job results directory
231        """
232        head = len(self.resultdir) + 1     # remove the / inbetween
233        return path[head:]
234
235
236    def control_get(self):
237        return self.control
238
239
240    def control_set(self, control):
241        self.control = os.path.abspath(control)
242
243
244    def harness_select(self, which):
245        self.harness = harness.select(which, self)
246
247
248    def config_set(self, name, value):
249        self.config.set(name, value)
250
251
252    def config_get(self, name):
253        return self.config.get(name)
254
255
256    def setup_dirs(self, results_dir, tmp_dir):
257        if not tmp_dir:
258            tmp_dir = os.path.join(self.tmpdir, 'build')
259        if not os.path.exists(tmp_dir):
260            os.mkdir(tmp_dir)
261        if not os.path.isdir(tmp_dir):
262            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
263            raise ValueError(e_msg)
264
265        # We label the first build "build" and then subsequent ones
266        # as "build.2", "build.3", etc. Whilst this is a little bit
267        # inconsistent, 99.9% of jobs will only have one build
268        # (that's not done as kernbench, sparse, or buildtest),
269        # so it works out much cleaner. One of life's comprimises.
270        if not results_dir:
271            results_dir = os.path.join(self.resultdir, 'build')
272            i = 2
273            while os.path.exists(results_dir):
274                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
275                i += 1
276        if not os.path.exists(results_dir):
277            os.mkdir(results_dir)
278
279        return (results_dir, tmp_dir)
280
281
282    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
283                            kjob = None ):
284        """Summon a xen object"""
285        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
286        build_dir = 'xen'
287        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
288                       leave, kjob)
289
290
291    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
292        """Summon a kernel object"""
293        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
294        build_dir = 'linux'
295        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
296                                  build_dir, leave)
297
298
299    def barrier(self, *args, **kwds):
300        """Create a barrier object"""
301        return barrier.barrier(*args, **kwds)
302
303
304    def install_pkg(self, name, pkg_type, install_dir):
305        '''
306        This method is a simple wrapper around the actual package
307        installation method in the Packager class. This is used
308        internally by the profilers, deps and tests code.
309        name : name of the package (ex: sleeptest, dbench etc.)
310        pkg_type : Type of the package (ex: test, dep etc.)
311        install_dir : The directory in which the source is actually
312                      untarred into. (ex: client/profilers/<name> for profilers)
313        '''
314        if len(self.pkgmgr.repo_urls) > 0:
315            self.pkgmgr.install_pkg(name, pkg_type,
316                                    self.pkgdir, install_dir)
317
318
319    def add_repository(self, repo_urls):
320        '''
321        Adds the repository locations to the job so that packages
322        can be fetched from them when needed. The repository list
323        needs to be a string list
324        Ex: job.add_repository(['http://blah1','http://blah2'])
325        '''
326        # TODO(aganti): Validate the list of the repository URLs.
327        repositories = repo_urls + self.pkgmgr.repo_urls
328        self.pkgmgr = packages.PackageManager(
329            self.autodir, repo_urls=repositories,
330            run_function_dargs={'timeout':1800})
331        # Fetch the packages' checksum file that contains the checksums
332        # of all the packages if it is not already fetched. The checksum
333        # is always fetched whenever a job is first started. This
334        # is not done in the job's constructor as we don't have the list of
335        # the repositories there (and obviously don't care about this file
336        # if we are not using the repos)
337        try:
338            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
339                                              packages.CHECKSUM_FILE)
340            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path,
341                                  use_checksum=False)
342        except packages.PackageFetchError, e:
343            # packaging system might not be working in this case
344            # Silently fall back to the normal case
345            pass
346
347
348    def require_gcc(self):
349        """
350        Test whether gcc is installed on the machine.
351        """
352        # check if gcc is installed on the system.
353        try:
354            utils.system('which gcc')
355        except error.CmdError, e:
356            raise NotAvailableError('gcc is required by this job and is '
357                                    'not available on the system')
358
359
360    def setup_dep(self, deps):
361        """Set up the dependencies for this test.
362        deps is a list of libraries required for this test.
363        """
364        # Fetch the deps from the repositories and set them up.
365        for dep in deps:
366            dep_dir = os.path.join(self.autodir, 'deps', dep)
367            # Search for the dependency in the repositories if specified,
368            # else check locally.
369            try:
370                self.install_pkg(dep, 'dep', dep_dir)
371            except packages.PackageInstallError:
372                # see if the dep is there locally
373                pass
374
375            # dep_dir might not exist if it is not fetched from the repos
376            if not os.path.exists(dep_dir):
377                raise error.TestError("Dependency %s does not exist" % dep)
378
379            os.chdir(dep_dir)
380            utils.system('./' + dep + '.py')
381
382
383    def _runtest(self, url, tag, args, dargs):
384        try:
385            try:
386                l = lambda : test.runtest(self, url, tag, args, dargs)
387                pid = parallel.fork_start(self.resultdir, l)
388                parallel.fork_waitfor(self.resultdir, pid)
389            except error.TestBaseException:
390                # These are already classified with an error type (exit_status)
391                raise
392            except error.JobError:
393                raise  # Caught further up and turned into an ABORT.
394            except Exception, e:
395                # Converts all other exceptions thrown by the test regardless
396                # of phase into a TestError(TestBaseException) subclass that
397                # reports them with their full stack trace.
398                raise error.UnhandledTestError(e)
399        finally:
400            # Reset the logging level to client level
401            debug.configure(module='client')
402
403
404    @_run_test_complete_on_exit
405    def run_test(self, url, *args, **dargs):
406        """Summon a test object and run it.
407
408        tag
409                tag to add to testname
410        url
411                url of the test to run
412        """
413
414        if not url:
415            raise TypeError("Test name is invalid. "
416                            "Switched arguments?")
417        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
418        namelen = len(testname)
419        dargs = dargs.copy()
420        tntag = dargs.pop('tag', None)
421        if tntag:         # per-test tag  is included in reported test name
422            testname += '.' + str(tntag)
423        run_number = self.get_run_number()
424        if run_number:
425            testname += '._%02d_' % run_number
426            self.set_run_number(run_number + 1)
427        if self._is_kernel_in_test_tag():
428            testname += '.' + platform.release()
429        if self._test_tag_prefix:
430            testname += '.' + self._test_tag_prefix
431        if self.testtag:  # job-level tag is included in reported test name
432            testname += '.' + self.testtag
433        subdir = testname
434        sdtag = dargs.pop('subdir_tag', None)
435        if sdtag:  # subdir-only tag is not included in reports
436            subdir = subdir + '.' + str(sdtag)
437        tag = subdir[namelen+1:]    # '' if none
438
439        outputdir = os.path.join(self.resultdir, subdir)
440        if os.path.exists(outputdir):
441            msg = ("%s already exists, test <%s> may have"
442                    " already run with tag <%s>"
443                    % (outputdir, testname, tag) )
444            raise error.TestError(msg)
445        # NOTE: client/common_lib/test.py runtest() depends directory names
446        # being constructed the same way as in this code.
447        os.mkdir(outputdir)
448
449        container = dargs.pop('container', None)
450        if container:
451            cname = container.get('name', None)
452            if not cname:   # get old name
453                cname = container.get('container_name', None)
454            mbytes = container.get('mbytes', None)
455            if not mbytes:  # get old name
456                mbytes = container.get('mem', None)
457            cpus  = container.get('cpus', None)
458            if not cpus:    # get old name
459                cpus  = container.get('cpu', None)
460            root  = container.get('root', None)
461            network = container.get('network', None)
462            disk = container.get('disk', None)
463            try:
464                self.new_container(mbytes=mbytes, cpus=cpus, root=root,
465                                   name=cname, network=network, disk=disk)
466            except cpuset.CpusetsNotAvailable, e:
467                self.record("START", subdir, testname)
468                self._increment_group_level()
469                self.record("ERROR", subdir, testname, str(e))
470                self._decrement_group_level()
471                self.record("END ERROR", subdir, testname)
472                return False
473            # We are running in a container now...
474
475        def log_warning(reason):
476            self.record("WARN", subdir, testname, reason)
477        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
478        def group_func():
479            try:
480                self._runtest(url, tag, args, dargs)
481            except error.TestBaseException, detail:
482                # The error is already classified, record it properly.
483                self.record(detail.exit_status, subdir, testname, str(detail))
484                raise
485            else:
486                self.record('GOOD', subdir, testname, 'completed successfully')
487
488        try:
489            self._rungroup(subdir, testname, group_func)
490            if container:
491                self.release_container()
492            return True
493        except error.TestBaseException:
494            return False
495        # Any other exception here will be given to the caller
496        #
497        # NOTE: The only exception possible from the control file here
498        # is error.JobError as _runtest() turns all others into an
499        # UnhandledTestError that is caught above.
500
501
502    def _rungroup(self, subdir, testname, function, *args, **dargs):
503        """\
504        subdir:
505                name of the group
506        testname:
507                name of the test to run, or support step
508        function:
509                subroutine to run
510        *args:
511                arguments for the function
512
513        Returns the result of the passed in function
514        """
515
516        try:
517            self.record('START', subdir, testname)
518            self._increment_group_level()
519            result = function(*args, **dargs)
520            self._decrement_group_level()
521            self.record('END GOOD', subdir, testname)
522            return result
523        except error.TestBaseException, e:
524            self._decrement_group_level()
525            self.record('END %s' % e.exit_status, subdir, testname)
526            raise
527        except error.JobError, e:
528            self._decrement_group_level()
529            self.record('END ABORT', subdir, testname)
530            raise
531        except Exception, e:
532            # This should only ever happen due to a bug in the given
533            # function's code.  The common case of being called by
534            # run_test() will never reach this.  If a control file called
535            # run_group() itself, bugs in its function will be caught
536            # here.
537            self._decrement_group_level()
538            err_msg = str(e) + '\n' + traceback.format_exc()
539            self.record('END ERROR', subdir, testname, err_msg)
540            raise
541
542
543    def run_group(self, function, tag=None, **dargs):
544        """
545        Run a function nested within a group level.
546
547        function:
548                Callable to run.
549        tag:
550                An optional tag name for the group.  If None (default)
551                function.__name__ will be used.
552        **dargs:
553                Named arguments for the function.
554        """
555        if tag:
556            name = tag
557        else:
558            name = function.__name__
559
560        try:
561            return self._rungroup(subdir=None, testname=name,
562                                  function=function, **dargs)
563        except (SystemExit, error.TestBaseException):
564            raise
565        # If there was a different exception, turn it into a TestError.
566        # It will be caught by step_engine or _run_step_fn.
567        except Exception, e:
568            raise error.UnhandledTestError(e)
569
570
571    _RUN_NUMBER_STATE = '__run_number'
572    def get_run_number(self):
573        """Get the run number or 0 if no run number has been set."""
574        return self.get_state(self._RUN_NUMBER_STATE, default=0)
575
576
577    def set_run_number(self, number):
578        """If the run number is non-zero it will be in the output dir name."""
579        self.set_state(self._RUN_NUMBER_STATE, number)
580
581
582    _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag'
583    def _is_kernel_in_test_tag(self):
584        """Boolean: should the kernel version be included in the test tag."""
585        return self.get_state(self._KERNEL_IN_TAG_STATE, default=False)
586
587
588    def show_kernel_in_test_tag(self, value=True):
589        """If True, the kernel version at test time will prefix test tags."""
590        self.set_state(self._KERNEL_IN_TAG_STATE, value)
591
592
593    def set_test_tag(self, tag=''):
594        """Set tag to be added to test name of all following run_test steps."""
595        self.testtag = tag
596
597
598    def set_test_tag_prefix(self, prefix):
599        """Set a prefix string to prepend to all future run_test steps.
600
601        Args:
602          prefix: A string to prepend to any run_test() step tags separated by
603              a '.'; use the empty string '' to clear it.
604        """
605        self._test_tag_prefix = prefix
606
607
608    def new_container(self, mbytes=None, cpus=None, root=None, name=None,
609                      network=None, disk=None, kswapd_merge=False):
610        if not utils.grep('cpuset', '/proc/filesystems'):
611            print "Containers not enabled by latest reboot"
612            return  # containers weren't enabled in this kernel boot
613        pid = os.getpid()
614        if not name:
615            name = 'test%d' % pid  # make arbitrary unique name
616        self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid,
617                                       cpus=cpus, root=root, network=network,
618                                       disk=disk, kswapd_merge=kswapd_merge)
619        # This job's python shell is now running in the new container
620        # and all forked test processes will inherit that container
621
622
623    def release_container(self):
624        if self.container:
625            self.container = self.container.release()
626
627
628    def cpu_count(self):
629        if self.container:
630            return len(self.container.cpus)
631        return utils.count_cpus()  # use total system count
632
633
634    def start_reboot(self):
635        self.record('START', None, 'reboot')
636        self._increment_group_level()
637        self.record('GOOD', None, 'reboot.start')
638
639
640    def end_reboot(self, subdir, kernel, patches):
641        kernel_info = {"kernel": kernel}
642        for i, patch in enumerate(patches):
643            kernel_info["patch%d" % i] = patch
644        self._decrement_group_level()
645        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
646
647
648    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
649                              type='src', patches=[]):
650        """ Check the passed kernel identifier against the command line
651            and the running kernel, abort the job on missmatch. """
652
653        print (("POST BOOT: checking booted kernel " +
654                "mark=%d identity='%s' type='%s'") %
655               (expected_when, expected_id, type))
656
657        running_id = utils.running_os_ident()
658
659        cmdline = utils.read_one_line("/proc/cmdline")
660
661        find_sum = re.compile(r'.*IDENT=(\d+)')
662        m = find_sum.match(cmdline)
663        cmdline_when = -1
664        if m:
665            cmdline_when = int(m.groups()[0])
666
667        # We have all the facts, see if they indicate we
668        # booted the requested kernel or not.
669        bad = False
670        if (type == 'src' and expected_id != running_id or
671            type == 'rpm' and
672            not running_id.startswith(expected_id + '::')):
673            print "check_kernel_ident: kernel identifier mismatch"
674            bad = True
675        if expected_when != cmdline_when:
676            print "check_kernel_ident: kernel command line mismatch"
677            bad = True
678
679        if bad:
680            print "   Expected Ident: " + expected_id
681            print "    Running Ident: " + running_id
682            print "    Expected Mark: %d" % (expected_when)
683            print "Command Line Mark: %d" % (cmdline_when)
684            print "     Command Line: " + cmdline
685
686            self.record("ABORT", subdir, "reboot.verify", "boot failure")
687            self._decrement_group_level()
688            kernel = {"kernel": running_id.split("::")[0]}
689            self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
690            raise error.JobError("reboot returned with the wrong kernel")
691
692        self.record('GOOD', subdir, 'reboot.verify', expected_id)
693        self.end_reboot(subdir, expected_id, patches)
694
695
696    def filesystem(self, device, mountpoint=None, loop_size=0):
697        if not mountpoint:
698            mountpoint = self.tmpdir
699        return partition.partition(self, device, loop_size)
700
701
702    def enable_external_logging(self):
703        pass
704
705
706    def disable_external_logging(self):
707        pass
708
709
710    def enable_test_cleanup(self):
711        """ By default tests run test.cleanup """
712        self.set_state("__run_test_cleanup", True)
713        self.run_test_cleanup = True
714
715
716    def disable_test_cleanup(self):
717        """ By default tests do not run test.cleanup """
718        self.set_state("__run_test_cleanup", False)
719        self.run_test_cleanup = False
720
721
722    def default_test_cleanup(self, val):
723        if not self._is_continuation:
724            self.set_state("__run_test_cleanup", val)
725            self.run_test_cleanup = val
726
727
728    def default_boot_tag(self, tag):
729        if not self._is_continuation:
730            self.set_state("__last_boot_tag", tag)
731            self.last_boot_tag = tag
732
733
734    def reboot_setup(self):
735        pass
736
737
738    def reboot(self, tag=LAST_BOOT_TAG):
739        if tag == LAST_BOOT_TAG:
740            tag = self.last_boot_tag
741        else:
742            self.set_state("__last_boot_tag", tag)
743            self.last_boot_tag = tag
744
745        self.reboot_setup()
746        self.harness.run_reboot()
747        default = self.config_get('boot.set_default')
748        if default:
749            self.bootloader.set_default(tag)
750        else:
751            self.bootloader.boot_once(tag)
752
753        # HACK: using this as a module sometimes hangs shutdown, so if it's
754        # installed unload it first
755        utils.system("modprobe -r netconsole", ignore_status=True)
756
757        cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
758        utils.system(cmd)
759        self.quit()
760
761
762    def noop(self, text):
763        print "job: noop: " + text
764
765
766    @_run_test_complete_on_exit
767    def parallel(self, *tasklist):
768        """Run tasks in parallel"""
769
770        pids = []
771        old_log_filename = self.log_filename
772        for i, task in enumerate(tasklist):
773            assert isinstance(task, (tuple, list))
774            self.log_filename = old_log_filename + (".%d" % i)
775            task_func = lambda: task[0](*task[1:])
776            pids.append(parallel.fork_start(self.resultdir, task_func))
777
778        old_log_path = os.path.join(self.resultdir, old_log_filename)
779        old_log = open(old_log_path, "a")
780        exceptions = []
781        for i, pid in enumerate(pids):
782            # wait for the task to finish
783            try:
784                parallel.fork_waitfor(self.resultdir, pid)
785            except Exception, e:
786                exceptions.append(e)
787            # copy the logs from the subtask into the main log
788            new_log_path = old_log_path + (".%d" % i)
789            if os.path.exists(new_log_path):
790                new_log = open(new_log_path)
791                old_log.write(new_log.read())
792                new_log.close()
793                old_log.flush()
794                os.remove(new_log_path)
795        old_log.close()
796
797        self.log_filename = old_log_filename
798
799        # handle any exceptions raised by the parallel tasks
800        if exceptions:
801            msg = "%d task(s) failed in job.parallel" % len(exceptions)
802            raise error.JobError(msg)
803
804
805    def quit(self):
806        # XXX: should have a better name.
807        self.harness.run_pause()
808        raise error.JobContinue("more to come")
809
810
811    def complete(self, status):
812        """Clean up and exit"""
813        # We are about to exit 'complete' so clean up the control file.
814        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
815        os.rename(self.state_file, dest)
816
817        self.harness.run_complete()
818        self.disable_external_logging()
819        sys.exit(status)
820
821
822    def set_state(self, var, val):
823        # Deep copies make sure that the state can't be altered
824        # without it being re-written.  Perf wise, deep copies
825        # are overshadowed by pickling/loading.
826        self.state[var] = copy.deepcopy(val)
827        outfile = open(self.state_file, 'w')
828        try:
829            pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL)
830        finally:
831            outfile.close()
832        print "Persistent state variable %s now set to %r" % (var, val)
833
834
835    def _load_state(self):
836        if hasattr(self, 'state'):
837            raise RuntimeError('state already exists')
838        infile = None
839        try:
840            try:
841                infile = open(self.state_file, 'rb')
842                self.state = pickle.load(infile)
843            except IOError:
844                self.state = {}
845                initialize = True
846        finally:
847            if infile:
848                infile.close()
849
850        initialize = '__steps' not in self.state
851        if not (self._is_continuation or initialize):
852            raise RuntimeError('Loaded state must contain __steps or be a '
853                               'continuation.')
854
855        if initialize:
856            print 'Initializing the state engine.'
857            self.set_state('__steps', [])  # writes pickle file
858
859
860    def get_state(self, var, default=NO_DEFAULT):
861        if var in self.state or default == NO_DEFAULT:
862            val = self.state[var]
863        else:
864            val = default
865        return copy.deepcopy(val)
866
867
868    def __create_step_tuple(self, fn, args, dargs):
869        # Legacy code passes in an array where the first arg is
870        # the function or its name.
871        if isinstance(fn, list):
872            assert(len(args) == 0)
873            assert(len(dargs) == 0)
874            args = fn[1:]
875            fn = fn[0]
876        # Pickling actual functions is hairy, thus we have to call
877        # them by name.  Unfortunately, this means only functions
878        # defined globally can be used as a next step.
879        if callable(fn):
880            fn = fn.__name__
881        if not isinstance(fn, types.StringTypes):
882            raise StepError("Next steps must be functions or "
883                            "strings containing the function name")
884        ancestry = copy.copy(self.current_step_ancestry)
885        return (ancestry, fn, args, dargs)
886
887
888    def next_step_append(self, fn, *args, **dargs):
889        """Define the next step and place it at the end"""
890        steps = self.get_state('__steps')
891        steps.append(self.__create_step_tuple(fn, args, dargs))
892        self.set_state('__steps', steps)
893
894
895    def next_step(self, fn, *args, **dargs):
896        """Create a new step and place it after any steps added
897        while running the current step but before any steps added in
898        previous steps"""
899        steps = self.get_state('__steps')
900        steps.insert(self.next_step_index,
901                     self.__create_step_tuple(fn, args, dargs))
902        self.next_step_index += 1
903        self.set_state('__steps', steps)
904
905
906    def next_step_prepend(self, fn, *args, **dargs):
907        """Insert a new step, executing first"""
908        steps = self.get_state('__steps')
909        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
910        self.next_step_index += 1
911        self.set_state('__steps', steps)
912
913
914    def _run_step_fn(self, local_vars, fn, args, dargs):
915        """Run a (step) function within the given context"""
916
917        local_vars['__args'] = args
918        local_vars['__dargs'] = dargs
919        try:
920            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
921            return local_vars['__ret']
922        except SystemExit:
923            raise  # Send error.JobContinue and JobComplete on up to runjob.
924        except error.TestNAError, detail:
925            self.record(detail.exit_status, None, fn, str(detail))
926        except Exception, detail:
927            raise error.UnhandledJobError(detail)
928
929
930    def _create_frame(self, global_vars, ancestry, fn_name):
931        """Set up the environment like it would have been when this
932        function was first defined.
933
934        Child step engine 'implementations' must have 'return locals()'
935        at end end of their steps.  Because of this, we can call the
936        parent function and get back all child functions (i.e. those
937        defined within it).
938
939        Unfortunately, the call stack of the function calling
940        job.next_step might have been deeper than the function it
941        added.  In order to make sure that the environment is what it
942        should be, we need to then pop off the frames we built until
943        we find the frame where the function was first defined."""
944
945        # The copies ensure that the parent frames are not modified
946        # while building child frames.  This matters if we then
947        # pop some frames in the next part of this function.
948        current_frame = copy.copy(global_vars)
949        frames = [current_frame]
950        for steps_fn_name in ancestry:
951            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
952            current_frame = copy.copy(ret)
953            frames.append(current_frame)
954
955        # Walk up the stack frames until we find the place fn_name was defined.
956        while len(frames) > 2:
957            if fn_name not in frames[-2]:
958                break
959            if frames[-2][fn_name] != frames[-1][fn_name]:
960                break
961            frames.pop()
962            ancestry.pop()
963
964        return (frames[-1], ancestry)
965
966
967    def _add_step_init(self, local_vars, current_function):
968        """If the function returned a dictionary that includes a
969        function named 'step_init', prepend it to our list of steps.
970        This will only get run the first time a function with a nested
971        use of the step engine is run."""
972
973        if (isinstance(local_vars, dict) and
974            'step_init' in local_vars and
975            callable(local_vars['step_init'])):
976            # The init step is a child of the function
977            # we were just running.
978            self.current_step_ancestry.append(current_function)
979            self.next_step_prepend('step_init')
980
981
982    def step_engine(self):
983        """The multi-run engine used when the control file defines step_init.
984
985        Does the next step.
986        """
987
988        # Set up the environment and then interpret the control file.
989        # Some control files will have code outside of functions,
990        # which means we need to have our state engine initialized
991        # before reading in the file.
992        global_control_vars = {'job': self}
993        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
994        try:
995            execfile(self.control, global_control_vars, global_control_vars)
996        except error.TestNAError, detail:
997            self.record(detail.exit_status, None, self.control, str(detail))
998        except SystemExit:
999            raise  # Send error.JobContinue and JobComplete on up to runjob.
1000        except Exception, detail:
1001            # Syntax errors or other general Python exceptions coming out of
1002            # the top level of the control file itself go through here.
1003            raise error.UnhandledJobError(detail)
1004
1005        # If we loaded in a mid-job state file, then we presumably
1006        # know what steps we have yet to run.
1007        if not self._is_continuation:
1008            if 'step_init' in global_control_vars:
1009                self.next_step(global_control_vars['step_init'])
1010
1011        # Iterate through the steps.  If we reboot, we'll simply
1012        # continue iterating on the next step.
1013        while len(self.get_state('__steps')) > 0:
1014            steps = self.get_state('__steps')
1015            (ancestry, fn_name, args, dargs) = steps.pop(0)
1016            self.set_state('__steps', steps)
1017
1018            self.next_step_index = 0
1019            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1020            local_vars, self.current_step_ancestry = ret
1021            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1022            self._add_step_init(local_vars, fn_name)
1023
1024
1025    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1026        self._add_sysinfo_loggable(sysinfo.command(command, logfile),
1027                                   on_every_test)
1028
1029
1030    def add_sysinfo_logfile(self, file, on_every_test=False):
1031        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1032
1033
1034    def _add_sysinfo_loggable(self, loggable, on_every_test):
1035        if on_every_test:
1036            self.sysinfo.test_loggables.add(loggable)
1037        else:
1038            self.sysinfo.boot_loggables.add(loggable)
1039        self._save_sysinfo_state()
1040
1041
1042    def _load_sysinfo_state(self):
1043        state = self.get_state("__sysinfo", None)
1044        if state:
1045            self.sysinfo.deserialize(state)
1046
1047
1048    def _save_sysinfo_state(self):
1049        state = self.sysinfo.serialize()
1050        self.set_state("__sysinfo", state)
1051
1052
1053    def _init_group_level(self):
1054        self.group_level = self.get_state("__group_level", default=0)
1055
1056
1057    def _increment_group_level(self):
1058        self.group_level += 1
1059        self.set_state("__group_level", self.group_level)
1060
1061
1062    def _decrement_group_level(self):
1063        self.group_level -= 1
1064        self.set_state("__group_level", self.group_level)
1065
1066
1067    def record(self, status_code, subdir, operation, status = '',
1068               optional_fields=None):
1069        """
1070        Record job-level status
1071
1072        The intent is to make this file both machine parseable and
1073        human readable. That involves a little more complexity, but
1074        really isn't all that bad ;-)
1075
1076        Format is <status code>\t<subdir>\t<operation>\t<status>
1077
1078        status code: (GOOD|WARN|FAIL|ABORT)
1079                or   START
1080                or   END (GOOD|WARN|FAIL|ABORT)
1081
1082        subdir: MUST be a relevant subdirectory in the results,
1083        or None, which will be represented as '----'
1084
1085        operation: description of what you ran (e.g. "dbench", or
1086                                        "mkfs -t foobar /dev/sda9")
1087
1088        status: error message or "completed sucessfully"
1089
1090        ------------------------------------------------------------
1091
1092        Initial tabs indicate indent levels for grouping, and is
1093        governed by self.group_level
1094
1095        multiline messages have secondary lines prefaced by a double
1096        space ('  ')
1097        """
1098
1099        if subdir:
1100            if re.match(r'[\n\t]', subdir):
1101                raise ValueError("Invalid character in subdir string")
1102            substr = subdir
1103        else:
1104            substr = '----'
1105
1106        if not log.is_valid_status(status_code):
1107            raise ValueError("Invalid status code supplied: %s" % status_code)
1108        if not operation:
1109            operation = '----'
1110
1111        if re.match(r'[\n\t]', operation):
1112            raise ValueError("Invalid character in operation string")
1113        operation = operation.rstrip()
1114
1115        if not optional_fields:
1116            optional_fields = {}
1117
1118        status = status.rstrip()
1119        status = re.sub(r"\t", "  ", status)
1120        # Ensure any continuation lines are marked so we can
1121        # detect them in the status file to ensure it is parsable.
1122        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
1123
1124        # Generate timestamps for inclusion in the logs
1125        epoch_time = int(time.time())  # seconds since epoch, in UTC
1126        local_time = time.localtime(epoch_time)
1127        optional_fields["timestamp"] = str(epoch_time)
1128        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1129                                                     local_time)
1130
1131        fields = [status_code, substr, operation]
1132        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1133        fields.append(status)
1134
1135        msg = '\t'.join(str(x) for x in fields)
1136        msg = '\t' * self.group_level + msg
1137
1138        msg_tag = ""
1139        if "." in self.log_filename:
1140            msg_tag = self.log_filename.split(".", 1)[1]
1141
1142        self.harness.test_status_detail(status_code, substr, operation, status,
1143                                        msg_tag)
1144        self.harness.test_status(msg, msg_tag)
1145
1146        # log to stdout (if enabled)
1147        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
1148        print msg
1149
1150        # log to the "root" status log
1151        status_file = os.path.join(self.resultdir, self.log_filename)
1152        open(status_file, "a").write(msg + "\n")
1153
1154        # log to the subdir status log (if subdir is set)
1155        if subdir:
1156            dir = os.path.join(self.resultdir, subdir)
1157            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1158            open(status_file, "a").write(msg + "\n")
1159
1160
1161class disk_usage_monitor:
1162    def __init__(self, logging_func, device, max_mb_per_hour):
1163        self.func = logging_func
1164        self.device = device
1165        self.max_mb_per_hour = max_mb_per_hour
1166
1167
1168    def start(self):
1169        self.initial_space = utils.freespace(self.device)
1170        self.start_time = time.time()
1171
1172
1173    def stop(self):
1174        # if no maximum usage rate was set, we don't need to
1175        # generate any warnings
1176        if not self.max_mb_per_hour:
1177            return
1178
1179        final_space = utils.freespace(self.device)
1180        used_space = self.initial_space - final_space
1181        stop_time = time.time()
1182        total_time = stop_time - self.start_time
1183        # round up the time to one minute, to keep extremely short
1184        # tests from generating false positives due to short, badly
1185        # timed bursts of activity
1186        total_time = max(total_time, 60.0)
1187
1188        # determine the usage rate
1189        bytes_per_sec = used_space / total_time
1190        mb_per_sec = bytes_per_sec / 1024**2
1191        mb_per_hour = mb_per_sec * 60 * 60
1192
1193        if mb_per_hour > self.max_mb_per_hour:
1194            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1195            msg %= (self.device, mb_per_hour)
1196            self.func(msg)
1197
1198
1199    @classmethod
1200    def watch(cls, *monitor_args, **monitor_dargs):
1201        """ Generic decorator to wrap a function call with the
1202        standard create-monitor -> start -> call -> stop idiom."""
1203        def decorator(func):
1204            def watched_func(*args, **dargs):
1205                monitor = cls(*monitor_args, **monitor_dargs)
1206                monitor.start()
1207                try:
1208                    func(*args, **dargs)
1209                finally:
1210                    monitor.stop()
1211            return watched_func
1212        return decorator
1213
1214
1215def runjob(control, cont=False, tag="default", harness_type='',
1216           use_external_logging=False):
1217    """
1218    Run a job using the given control file.
1219
1220    This is the main interface to this module.
1221
1222    Args:
1223        control: The control file to use for this job.
1224        cont: Whether this is the continuation of a previously started job.
1225        tag: The job tag string.  ['default']
1226        harness_type: An alternative server harness.  [None]
1227        use_external_logging: Should external logging be enabled?  [False]
1228    """
1229    control = os.path.abspath(control)
1230    state = control + '.state'
1231
1232    # instantiate the job object ready for the control file.
1233    myjob = None
1234    try:
1235        # Check that the control file is valid
1236        if not os.path.exists(control):
1237            raise error.JobError(control + ": control file not found")
1238
1239        # When continuing, the job is complete when there is no
1240        # state file, ensure we don't try and continue.
1241        if cont and not os.path.exists(state):
1242            raise error.JobComplete("all done")
1243
1244        myjob = job(control, tag, cont, harness_type=harness_type,
1245                    use_external_logging=use_external_logging)
1246
1247        # Load in the users control file, may do any one of:
1248        #  1) execute in toto
1249        #  2) define steps, and select the first via next_step()
1250        myjob.step_engine()
1251
1252    except error.JobContinue:
1253        sys.exit(5)
1254
1255    except error.JobComplete:
1256        sys.exit(1)
1257
1258    except error.JobError, instance:
1259        print "JOB ERROR: " + instance.args[0]
1260        if myjob:
1261            command = None
1262            if len(instance.args) > 1:
1263                command = instance.args[1]
1264                myjob.record('ABORT', None, command, instance.args[0])
1265            myjob._decrement_group_level()
1266            myjob.record('END ABORT', None, None, instance.args[0])
1267            assert (myjob.group_level == 0), ('myjob.group_level must be 0,'
1268                                              ' not %d' % myjob.group_level)
1269            myjob.complete(1)
1270        else:
1271            sys.exit(1)
1272
1273    except Exception, e:
1274        # NOTE: job._run_step_fn and job.step_engine will turn things into
1275        # a JobError for us.  If we get here, its likely an autotest bug.
1276        msg = str(e) + '\n' + traceback.format_exc()
1277        print "JOB ERROR (autotest bug?): " + msg
1278        if myjob:
1279            myjob._decrement_group_level()
1280            myjob.record('END ABORT', None, None, msg)
1281            assert(myjob.group_level == 0)
1282            myjob.complete(1)
1283        else:
1284            sys.exit(1)
1285
1286    # If we get here, then we assume the job is complete and good.
1287    myjob._decrement_group_level()
1288    myjob.record('END GOOD', None, None)
1289    assert(myjob.group_level == 0)
1290
1291    myjob.complete(0)
1292
1293
1294site_job = utils.import_site_class(
1295    __file__, "autotest_lib.client.bin.site_job", "site_job", base_job)
1296
1297class job(site_job, base_job):
1298    pass
1299