job.py revision c39fa9ada24f773079d6162383013827e4bc4292
1"""The main job wrapper
2
3This is the core infrastructure.
4
5Copyright Andy Whitcroft, Martin J. Bligh 2006
6"""
7
8import copy, os, platform, re, shutil, sys, time, traceback, types
9import cPickle as pickle
10from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen
11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness
12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition
13from autotest_lib.client.common_lib import error, barrier, log, utils
14from autotest_lib.client.common_lib import packages, debug
15
16LAST_BOOT_TAG = object()
17NO_DEFAULT = object()
18JOB_PREAMBLE = """
19from autotest_lib.client.common_lib.error import *
20from autotest_lib.client.common_lib.utils import *
21from autotest_lib.client.bin.autotest_utils import *
22"""
23
24
25class StepError(error.AutotestError):
26    pass
27
28class NotAvailableError(error.AutotestError):
29    pass
30
31
32
33def _run_test_complete_on_exit(f):
34    """Decorator for job methods that automatically calls
35    self.harness.run_test_complete when the method exits, if appropriate."""
36    def wrapped(self, *args, **dargs):
37        try:
38            return f(self, *args, **dargs)
39        finally:
40            if self.log_filename == self.DEFAULT_LOG_FILENAME:
41                self.harness.run_test_complete()
42                if self.drop_caches:
43                    print "Dropping caches"
44                    autotest_utils.drop_caches()
45    wrapped.__name__ = f.__name__
46    wrapped.__doc__ = f.__doc__
47    wrapped.__dict__.update(f.__dict__)
48    return wrapped
49
50
51class base_job(object):
52    """The actual job against which we do everything.
53
54    Properties:
55            autodir
56                    The top level autotest directory (/usr/local/autotest).
57                    Comes from os.environ['AUTODIR'].
58            bindir
59                    <autodir>/bin/
60            libdir
61                    <autodir>/lib/
62            testdir
63                    <autodir>/tests/
64            configdir
65                    <autodir>/config/
66            site_testdir
67                    <autodir>/site_tests/
68            profdir
69                    <autodir>/profilers/
70            tmpdir
71                    <autodir>/tmp/
72            pkgdir
73                    <autodir>/packages/
74            resultdir
75                    <autodir>/results/<jobtag>
76            toolsdir
77                    <autodir>/tools/
78            stdout
79                    fd_stack object for stdout
80            stderr
81                    fd_stack object for stderr
82            profilers
83                    the profilers object for this job
84            harness
85                    the server harness object for this job
86            config
87                    the job configuration for this job
88            drop_caches_between_iterations
89                    drop the pagecache between each iteration
90    """
91
92    DEFAULT_LOG_FILENAME = "status"
93
94    def __init__(self, control, jobtag, cont, harness_type=None,
95                 use_external_logging=False, drop_caches=True):
96        """
97        Prepare a client side job object.
98
99        Args:
100          control: The control file (pathname of).
101          jobtag: The job tag string (eg "default").
102          cont: If this is the continuation of this job.
103          harness_type: An alternative server harness.  [None]
104          use_external_logging: If true, the enable_external_logging
105                  method will be called during construction.  [False]
106          drop_caches: If true, autotest_utils.drop_caches() is
107                  called before and between all tests.  [True]
108        """
109        self.drop_caches = drop_caches
110        if self.drop_caches:
111            print "Dropping caches"
112            autotest_utils.drop_caches()
113        self.drop_caches_between_iterations = False
114        self.autodir = os.environ['AUTODIR']
115        self.bindir = os.path.join(self.autodir, 'bin')
116        self.libdir = os.path.join(self.autodir, 'lib')
117        self.testdir = os.path.join(self.autodir, 'tests')
118        self.configdir = os.path.join(self.autodir, 'config')
119        self.site_testdir = os.path.join(self.autodir, 'site_tests')
120        self.profdir = os.path.join(self.autodir, 'profilers')
121        self.tmpdir = os.path.join(self.autodir, 'tmp')
122        self.toolsdir = os.path.join(self.autodir, 'tools')
123        self.resultdir = os.path.join(self.autodir, 'results', jobtag)
124
125        self.control = os.path.realpath(control)
126        self._is_continuation = cont
127        self.state_file = self.control + '.state'
128        self.current_step_ancestry = []
129        self.next_step_index = 0
130        self.testtag = ''
131        self._test_tag_prefix = ''
132
133        self._load_state()
134        self.pkgmgr = packages.PackageManager(
135            self.autodir, run_function_dargs={'timeout':1800})
136        self.pkgdir = os.path.join(self.autodir, 'packages')
137        self.run_test_cleanup = self.get_state("__run_test_cleanup",
138                                                default=True)
139
140        self.sysinfo = sysinfo.sysinfo(self.resultdir)
141        self._load_sysinfo_state()
142
143        self.last_boot_tag = self.get_state("__last_boot_tag", default=None)
144        self.job_log = debug.get_logger(module='client')
145
146        if not cont:
147            """
148            Don't cleanup the tmp dir (which contains the lockfile)
149            in the constructor, this would be a problem for multiple
150            jobs starting at the same time on the same client. Instead
151            do the delete at the server side. We simply create the tmp
152            directory here if it does not already exist.
153            """
154            if not os.path.exists(self.tmpdir):
155                os.mkdir(self.tmpdir)
156
157            if not os.path.exists(self.pkgdir):
158                os.mkdir(self.pkgdir)
159
160            results = os.path.join(self.autodir, 'results')
161            if not os.path.exists(results):
162                os.mkdir(results)
163
164            download = os.path.join(self.testdir, 'download')
165            if not os.path.exists(download):
166                os.mkdir(download)
167
168            if os.path.exists(self.resultdir):
169                utils.system('rm -rf ' + self.resultdir)
170            os.mkdir(self.resultdir)
171
172            os.mkdir(os.path.join(self.resultdir, 'debug'))
173            os.mkdir(os.path.join(self.resultdir, 'analysis'))
174
175            shutil.copyfile(self.control,
176                            os.path.join(self.resultdir, 'control'))
177
178
179        self.control = control
180        self.jobtag = jobtag
181        self.log_filename = self.DEFAULT_LOG_FILENAME
182        self.container = None
183
184        self.stdout = fd_stack.fd_stack(1, sys.stdout)
185        self.stderr = fd_stack.fd_stack(2, sys.stderr)
186
187        self._init_group_level()
188
189        self.config = config.config(self)
190        self.harness = harness.select(harness_type, self)
191        self.profilers = profilers.profilers(self)
192
193        try:
194            tool = self.config_get('boottool.executable')
195            self.bootloader = boottool.boottool(tool)
196        except:
197            pass
198
199        self.sysinfo.log_per_reboot_data()
200
201        if not cont:
202            self.record('START', None, None)
203            self._increment_group_level()
204
205        self.harness.run_start()
206
207        if use_external_logging:
208            self.enable_external_logging()
209
210        # load the max disk usage rate - default to no monitoring
211        self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0)
212
213
214    def monitor_disk_usage(self, max_rate):
215        """\
216        Signal that the job should monitor disk space usage on /
217        and generate a warning if a test uses up disk space at a
218        rate exceeding 'max_rate'.
219
220        Parameters:
221             max_rate - the maximium allowed rate of disk consumption
222                        during a test, in MB/hour, or 0 to indicate
223                        no limit.
224        """
225        self.set_state('__monitor_disk', max_rate)
226        self.max_disk_usage_rate = max_rate
227
228
229    def relative_path(self, path):
230        """\
231        Return a patch relative to the job results directory
232        """
233        head = len(self.resultdir) + 1     # remove the / inbetween
234        return path[head:]
235
236
237    def control_get(self):
238        return self.control
239
240
241    def control_set(self, control):
242        self.control = os.path.abspath(control)
243
244
245    def harness_select(self, which):
246        self.harness = harness.select(which, self)
247
248
249    def config_set(self, name, value):
250        self.config.set(name, value)
251
252
253    def config_get(self, name):
254        return self.config.get(name)
255
256
257    def setup_dirs(self, results_dir, tmp_dir):
258        if not tmp_dir:
259            tmp_dir = os.path.join(self.tmpdir, 'build')
260        if not os.path.exists(tmp_dir):
261            os.mkdir(tmp_dir)
262        if not os.path.isdir(tmp_dir):
263            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
264            raise ValueError(e_msg)
265
266        # We label the first build "build" and then subsequent ones
267        # as "build.2", "build.3", etc. Whilst this is a little bit
268        # inconsistent, 99.9% of jobs will only have one build
269        # (that's not done as kernbench, sparse, or buildtest),
270        # so it works out much cleaner. One of life's comprimises.
271        if not results_dir:
272            results_dir = os.path.join(self.resultdir, 'build')
273            i = 2
274            while os.path.exists(results_dir):
275                results_dir = os.path.join(self.resultdir, 'build.%d' % i)
276                i += 1
277        if not os.path.exists(results_dir):
278            os.mkdir(results_dir)
279
280        return (results_dir, tmp_dir)
281
282
283    def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
284                            kjob = None ):
285        """Summon a xen object"""
286        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
287        build_dir = 'xen'
288        return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir,
289                       leave, kjob)
290
291
292    def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
293        """Summon a kernel object"""
294        (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
295        build_dir = 'linux'
296        return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir,
297                                  build_dir, leave)
298
299
300    def barrier(self, *args, **kwds):
301        """Create a barrier object"""
302        return barrier.barrier(*args, **kwds)
303
304
305    def install_pkg(self, name, pkg_type, install_dir):
306        '''
307        This method is a simple wrapper around the actual package
308        installation method in the Packager class. This is used
309        internally by the profilers, deps and tests code.
310        name : name of the package (ex: sleeptest, dbench etc.)
311        pkg_type : Type of the package (ex: test, dep etc.)
312        install_dir : The directory in which the source is actually
313                      untarred into. (ex: client/profilers/<name> for profilers)
314        '''
315        if len(self.pkgmgr.repo_urls) > 0:
316            self.pkgmgr.install_pkg(name, pkg_type,
317                                    self.pkgdir, install_dir)
318
319
320    def add_repository(self, repo_urls):
321        '''
322        Adds the repository locations to the job so that packages
323        can be fetched from them when needed. The repository list
324        needs to be a string list
325        Ex: job.add_repository(['http://blah1','http://blah2'])
326        '''
327        # TODO(aganti): Validate the list of the repository URLs.
328        repositories = repo_urls + self.pkgmgr.repo_urls
329        self.pkgmgr = packages.PackageManager(
330            self.autodir, repo_urls=repositories,
331            run_function_dargs={'timeout':1800})
332        # Fetch the packages' checksum file that contains the checksums
333        # of all the packages if it is not already fetched. The checksum
334        # is always fetched whenever a job is first started. This
335        # is not done in the job's constructor as we don't have the list of
336        # the repositories there (and obviously don't care about this file
337        # if we are not using the repos)
338        try:
339            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,
340                                              packages.CHECKSUM_FILE)
341            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path,
342                                  use_checksum=False)
343        except packages.PackageFetchError, e:
344            # packaging system might not be working in this case
345            # Silently fall back to the normal case
346            pass
347
348
349    def require_gcc(self):
350        """
351        Test whether gcc is installed on the machine.
352        """
353        # check if gcc is installed on the system.
354        try:
355            utils.system('which gcc')
356        except error.CmdError, e:
357            raise NotAvailableError('gcc is required by this job and is '
358                                    'not available on the system')
359
360
361    def setup_dep(self, deps):
362        """Set up the dependencies for this test.
363        deps is a list of libraries required for this test.
364        """
365        # Fetch the deps from the repositories and set them up.
366        for dep in deps:
367            dep_dir = os.path.join(self.autodir, 'deps', dep)
368            # Search for the dependency in the repositories if specified,
369            # else check locally.
370            try:
371                self.install_pkg(dep, 'dep', dep_dir)
372            except packages.PackageInstallError:
373                # see if the dep is there locally
374                pass
375
376            # dep_dir might not exist if it is not fetched from the repos
377            if not os.path.exists(dep_dir):
378                raise error.TestError("Dependency %s does not exist" % dep)
379
380            os.chdir(dep_dir)
381            utils.system('./' + dep + '.py')
382
383
384    def _runtest(self, url, tag, args, dargs):
385        try:
386            try:
387                l = lambda : test.runtest(self, url, tag, args, dargs)
388                pid = parallel.fork_start(self.resultdir, l)
389                parallel.fork_waitfor(self.resultdir, pid)
390            except error.TestBaseException:
391                # These are already classified with an error type (exit_status)
392                raise
393            except error.JobError:
394                raise  # Caught further up and turned into an ABORT.
395            except Exception, e:
396                # Converts all other exceptions thrown by the test regardless
397                # of phase into a TestError(TestBaseException) subclass that
398                # reports them with their full stack trace.
399                raise error.UnhandledTestError(e)
400        finally:
401            # Reset the logging level to client level
402            debug.configure(module='client')
403
404
405    @_run_test_complete_on_exit
406    def run_test(self, url, *args, **dargs):
407        """Summon a test object and run it.
408
409        tag
410                tag to add to testname
411        url
412                url of the test to run
413        """
414
415        if not url:
416            raise TypeError("Test name is invalid. "
417                            "Switched arguments?")
418        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
419        namelen = len(testname)
420        dargs = dargs.copy()
421        tntag = dargs.pop('tag', None)
422        if tntag:         # per-test tag  is included in reported test name
423            testname += '.' + str(tntag)
424        run_number = self.get_run_number()
425        if run_number:
426            testname += '._%02d_' % run_number
427            self.set_run_number(run_number + 1)
428        if self._is_kernel_in_test_tag():
429            testname += '.' + platform.release()
430        if self._test_tag_prefix:
431            testname += '.' + self._test_tag_prefix
432        if self.testtag:  # job-level tag is included in reported test name
433            testname += '.' + self.testtag
434        subdir = testname
435        sdtag = dargs.pop('subdir_tag', None)
436        if sdtag:  # subdir-only tag is not included in reports
437            subdir = subdir + '.' + str(sdtag)
438        tag = subdir[namelen+1:]    # '' if none
439
440        outputdir = os.path.join(self.resultdir, subdir)
441        if os.path.exists(outputdir):
442            msg = ("%s already exists, test <%s> may have"
443                    " already run with tag <%s>"
444                    % (outputdir, testname, tag) )
445            raise error.TestError(msg)
446        # NOTE: client/common_lib/test.py runtest() depends directory names
447        # being constructed the same way as in this code.
448        os.mkdir(outputdir)
449
450        container = dargs.pop('container', None)
451        if container:
452            cname = container.get('name', None)
453            if not cname:   # get old name
454                cname = container.get('container_name', None)
455            mbytes = container.get('mbytes', None)
456            if not mbytes:  # get old name
457                mbytes = container.get('mem', None)
458            cpus  = container.get('cpus', None)
459            if not cpus:    # get old name
460                cpus  = container.get('cpu', None)
461            root  = container.get('root', None)
462            network = container.get('network', None)
463            disk = container.get('disk', None)
464            try:
465                self.new_container(mbytes=mbytes, cpus=cpus, root=root,
466                                   name=cname, network=network, disk=disk)
467            except cpuset.CpusetsNotAvailable, e:
468                self.record("START", subdir, testname)
469                self._increment_group_level()
470                self.record("ERROR", subdir, testname, str(e))
471                self._decrement_group_level()
472                self.record("END ERROR", subdir, testname)
473                return False
474            # We are running in a container now...
475
476        def log_warning(reason):
477            self.record("WARN", subdir, testname, reason)
478        @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate)
479        def group_func():
480            try:
481                self._runtest(url, tag, args, dargs)
482            except error.TestBaseException, detail:
483                # The error is already classified, record it properly.
484                self.record(detail.exit_status, subdir, testname, str(detail))
485                raise
486            else:
487                self.record('GOOD', subdir, testname, 'completed successfully')
488
489        try:
490            self._rungroup(subdir, testname, group_func)
491            if container:
492                self.release_container()
493            return True
494        except error.TestBaseException:
495            return False
496        # Any other exception here will be given to the caller
497        #
498        # NOTE: The only exception possible from the control file here
499        # is error.JobError as _runtest() turns all others into an
500        # UnhandledTestError that is caught above.
501
502
503    def _rungroup(self, subdir, testname, function, *args, **dargs):
504        """\
505        subdir:
506                name of the group
507        testname:
508                name of the test to run, or support step
509        function:
510                subroutine to run
511        *args:
512                arguments for the function
513
514        Returns the result of the passed in function
515        """
516
517        try:
518            self.record('START', subdir, testname)
519            self._increment_group_level()
520            result = function(*args, **dargs)
521            self._decrement_group_level()
522            self.record('END GOOD', subdir, testname)
523            return result
524        except error.TestBaseException, e:
525            self._decrement_group_level()
526            self.record('END %s' % e.exit_status, subdir, testname, str(e))
527            raise
528        except error.JobError, e:
529            self._decrement_group_level()
530            self.record('END ABORT', subdir, testname, str(e))
531            raise
532        except Exception, e:
533            # This should only ever happen due to a bug in the given
534            # function's code.  The common case of being called by
535            # run_test() will never reach this.  If a control file called
536            # run_group() itself, bugs in its function will be caught
537            # here.
538            self._decrement_group_level()
539            err_msg = str(e) + '\n' + traceback.format_exc()
540            self.record('END ERROR', subdir, testname, err_msg)
541            raise
542
543
544    def run_group(self, function, tag=None, **dargs):
545        """
546        Run a function nested within a group level.
547
548        function:
549                Callable to run.
550        tag:
551                An optional tag name for the group.  If None (default)
552                function.__name__ will be used.
553        **dargs:
554                Named arguments for the function.
555        """
556        if tag:
557            name = tag
558        else:
559            name = function.__name__
560
561        try:
562            return self._rungroup(subdir=None, testname=name,
563                                  function=function, **dargs)
564        except (SystemExit, error.TestBaseException):
565            raise
566        # If there was a different exception, turn it into a TestError.
567        # It will be caught by step_engine or _run_step_fn.
568        except Exception, e:
569            raise error.UnhandledTestError(e)
570
571
572    _RUN_NUMBER_STATE = '__run_number'
573    def get_run_number(self):
574        """Get the run number or 0 if no run number has been set."""
575        return self.get_state(self._RUN_NUMBER_STATE, default=0)
576
577
578    def set_run_number(self, number):
579        """If the run number is non-zero it will be in the output dir name."""
580        self.set_state(self._RUN_NUMBER_STATE, number)
581
582
583    _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag'
584    def _is_kernel_in_test_tag(self):
585        """Boolean: should the kernel version be included in the test tag."""
586        return self.get_state(self._KERNEL_IN_TAG_STATE, default=False)
587
588
589    def show_kernel_in_test_tag(self, value=True):
590        """If True, the kernel version at test time will prefix test tags."""
591        self.set_state(self._KERNEL_IN_TAG_STATE, value)
592
593
594    def set_test_tag(self, tag=''):
595        """Set tag to be added to test name of all following run_test steps."""
596        self.testtag = tag
597
598
599    def set_test_tag_prefix(self, prefix):
600        """Set a prefix string to prepend to all future run_test steps.
601
602        Args:
603          prefix: A string to prepend to any run_test() step tags separated by
604              a '.'; use the empty string '' to clear it.
605        """
606        self._test_tag_prefix = prefix
607
608
609    def new_container(self, mbytes=None, cpus=None, root=None, name=None,
610                      network=None, disk=None, kswapd_merge=False):
611        if not autotest_utils.grep('cpuset', '/proc/filesystems'):
612            print "Containers not enabled by latest reboot"
613            return  # containers weren't enabled in this kernel boot
614        pid = os.getpid()
615        if not name:
616            name = 'test%d' % pid  # make arbitrary unique name
617        self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid,
618                                       cpus=cpus, root=root, network=network,
619                                       disk=disk, kswapd_merge=kswapd_merge)
620        # This job's python shell is now running in the new container
621        # and all forked test processes will inherit that container
622
623
624    def release_container(self):
625        if self.container:
626            self.container = self.container.release()
627
628
629    def cpu_count(self):
630        if self.container:
631            return len(self.container.cpus)
632        return autotest_utils.count_cpus()  # use total system count
633
634
635    def start_reboot(self):
636        self.record('START', None, 'reboot')
637        self._increment_group_level()
638        self.record('GOOD', None, 'reboot.start')
639
640
641    def end_reboot(self, subdir, kernel, patches):
642        kernel_info = {"kernel": kernel}
643        for i, patch in enumerate(patches):
644            kernel_info["patch%d" % i] = patch
645        self._decrement_group_level()
646        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)
647
648
649    def end_reboot_and_verify(self, expected_when, expected_id, subdir,
650                              type='src', patches=[]):
651        """ Check the passed kernel identifier against the command line
652            and the running kernel, abort the job on missmatch. """
653
654        print (("POST BOOT: checking booted kernel " +
655                "mark=%d identity='%s' type='%s'") %
656               (expected_when, expected_id, type))
657
658        running_id = autotest_utils.running_os_ident()
659
660        cmdline = utils.read_one_line("/proc/cmdline")
661
662        find_sum = re.compile(r'.*IDENT=(\d+)')
663        m = find_sum.match(cmdline)
664        cmdline_when = -1
665        if m:
666            cmdline_when = int(m.groups()[0])
667
668        # We have all the facts, see if they indicate we
669        # booted the requested kernel or not.
670        bad = False
671        if (type == 'src' and expected_id != running_id or
672            type == 'rpm' and
673            not running_id.startswith(expected_id + '::')):
674            print "check_kernel_ident: kernel identifier mismatch"
675            bad = True
676        if expected_when != cmdline_when:
677            print "check_kernel_ident: kernel command line mismatch"
678            bad = True
679
680        if bad:
681            print "   Expected Ident: " + expected_id
682            print "    Running Ident: " + running_id
683            print "    Expected Mark: %d" % (expected_when)
684            print "Command Line Mark: %d" % (cmdline_when)
685            print "     Command Line: " + cmdline
686
687            self.record("ABORT", subdir, "reboot.verify", "boot failure")
688            self._decrement_group_level()
689            kernel = {"kernel": running_id.split("::")[0]}
690            self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)
691            raise error.JobError("reboot returned with the wrong kernel")
692
693        self.record('GOOD', subdir, 'reboot.verify', expected_id)
694        self.end_reboot(subdir, expected_id, patches)
695
696
697    def filesystem(self, device, mountpoint = None, loop_size = 0):
698        if not mountpoint:
699            mountpoint = self.tmpdir
700        return partition.partition(self, device, mountpoint, loop_size)
701
702
703    def enable_external_logging(self):
704        pass
705
706
707    def disable_external_logging(self):
708        pass
709
710
711    def enable_test_cleanup(self):
712        """ By default tests run test.cleanup """
713        self.set_state("__run_test_cleanup", True)
714        self.run_test_cleanup = True
715
716
717    def disable_test_cleanup(self):
718        """ By default tests do not run test.cleanup """
719        self.set_state("__run_test_cleanup", False)
720        self.run_test_cleanup = False
721
722
723    def default_test_cleanup(self, val):
724        if not self._is_continuation:
725            self.set_state("__run_test_cleanup", val)
726            self.run_test_cleanup = val
727
728
729    def default_boot_tag(self, tag):
730        if not self._is_continuation:
731            self.set_state("__last_boot_tag", tag)
732            self.last_boot_tag = tag
733
734
735    def reboot_setup(self):
736        pass
737
738
739    def reboot(self, tag=LAST_BOOT_TAG):
740        if tag == LAST_BOOT_TAG:
741            tag = self.last_boot_tag
742        else:
743            self.set_state("__last_boot_tag", tag)
744            self.last_boot_tag = tag
745
746        self.reboot_setup()
747        self.harness.run_reboot()
748        default = self.config_get('boot.set_default')
749        if default:
750            self.bootloader.set_default(tag)
751        else:
752            self.bootloader.boot_once(tag)
753
754        # HACK: using this as a module sometimes hangs shutdown, so if it's
755        # installed unload it first
756        utils.system("modprobe -r netconsole", ignore_status=True)
757
758        cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
759        utils.system(cmd)
760        self.quit()
761
762
763    def noop(self, text):
764        print "job: noop: " + text
765
766
767    @_run_test_complete_on_exit
768    def parallel(self, *tasklist):
769        """Run tasks in parallel"""
770
771        pids = []
772        old_log_filename = self.log_filename
773        for i, task in enumerate(tasklist):
774            assert isinstance(task, (tuple, list))
775            self.log_filename = old_log_filename + (".%d" % i)
776            task_func = lambda: task[0](*task[1:])
777            pids.append(parallel.fork_start(self.resultdir, task_func))
778
779        old_log_path = os.path.join(self.resultdir, old_log_filename)
780        old_log = open(old_log_path, "a")
781        exceptions = []
782        for i, pid in enumerate(pids):
783            # wait for the task to finish
784            try:
785                parallel.fork_waitfor(self.resultdir, pid)
786            except Exception, e:
787                exceptions.append(e)
788            # copy the logs from the subtask into the main log
789            new_log_path = old_log_path + (".%d" % i)
790            if os.path.exists(new_log_path):
791                new_log = open(new_log_path)
792                old_log.write(new_log.read())
793                new_log.close()
794                old_log.flush()
795                os.remove(new_log_path)
796        old_log.close()
797
798        self.log_filename = old_log_filename
799
800        # handle any exceptions raised by the parallel tasks
801        if exceptions:
802            msg = "%d task(s) failed in job.parallel" % len(exceptions)
803            raise error.JobError(msg)
804
805
806    def quit(self):
807        # XXX: should have a better name.
808        self.harness.run_pause()
809        raise error.JobContinue("more to come")
810
811
812    def complete(self, status):
813        """Clean up and exit"""
814        # We are about to exit 'complete' so clean up the control file.
815        dest = os.path.join(self.resultdir, os.path.basename(self.state_file))
816        os.rename(self.state_file, dest)
817
818        self.harness.run_complete()
819        self.disable_external_logging()
820        sys.exit(status)
821
822
823    def set_state(self, var, val):
824        # Deep copies make sure that the state can't be altered
825        # without it being re-written.  Perf wise, deep copies
826        # are overshadowed by pickling/loading.
827        self.state[var] = copy.deepcopy(val)
828        outfile = open(self.state_file, 'w')
829        try:
830            pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL)
831        finally:
832            outfile.close()
833        print "Persistent state variable %s now set to %r" % (var, val)
834
835
836    def _load_state(self):
837        if hasattr(self, 'state'):
838            raise RuntimeError('state already exists')
839        infile = None
840        try:
841            try:
842                infile = open(self.state_file, 'rb')
843                self.state = pickle.load(infile)
844            except IOError:
845                self.state = {}
846                initialize = True
847        finally:
848            if infile:
849                infile.close()
850
851        initialize = '__steps' not in self.state
852        if not (self._is_continuation or initialize):
853            raise RuntimeError('Loaded state must contain __steps or be a '
854                               'continuation.')
855
856        if initialize:
857            print 'Initializing the state engine.'
858            self.set_state('__steps', [])  # writes pickle file
859
860
861    def get_state(self, var, default=NO_DEFAULT):
862        if var in self.state or default == NO_DEFAULT:
863            val = self.state[var]
864        else:
865            val = default
866        return copy.deepcopy(val)
867
868
869    def __create_step_tuple(self, fn, args, dargs):
870        # Legacy code passes in an array where the first arg is
871        # the function or its name.
872        if isinstance(fn, list):
873            assert(len(args) == 0)
874            assert(len(dargs) == 0)
875            args = fn[1:]
876            fn = fn[0]
877        # Pickling actual functions is hairy, thus we have to call
878        # them by name.  Unfortunately, this means only functions
879        # defined globally can be used as a next step.
880        if callable(fn):
881            fn = fn.__name__
882        if not isinstance(fn, types.StringTypes):
883            raise StepError("Next steps must be functions or "
884                            "strings containing the function name")
885        ancestry = copy.copy(self.current_step_ancestry)
886        return (ancestry, fn, args, dargs)
887
888
889    def next_step_append(self, fn, *args, **dargs):
890        """Define the next step and place it at the end"""
891        steps = self.get_state('__steps')
892        steps.append(self.__create_step_tuple(fn, args, dargs))
893        self.set_state('__steps', steps)
894
895
896    def next_step(self, fn, *args, **dargs):
897        """Create a new step and place it after any steps added
898        while running the current step but before any steps added in
899        previous steps"""
900        steps = self.get_state('__steps')
901        steps.insert(self.next_step_index,
902                     self.__create_step_tuple(fn, args, dargs))
903        self.next_step_index += 1
904        self.set_state('__steps', steps)
905
906
907    def next_step_prepend(self, fn, *args, **dargs):
908        """Insert a new step, executing first"""
909        steps = self.get_state('__steps')
910        steps.insert(0, self.__create_step_tuple(fn, args, dargs))
911        self.next_step_index += 1
912        self.set_state('__steps', steps)
913
914
915    def _run_step_fn(self, local_vars, fn, args, dargs):
916        """Run a (step) function within the given context"""
917
918        local_vars['__args'] = args
919        local_vars['__dargs'] = dargs
920        try:
921            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)
922            return local_vars['__ret']
923        except SystemExit:
924            raise  # Send error.JobContinue and JobComplete on up to runjob.
925        except error.TestNAError, detail:
926            self.record(detail.exit_status, None, fn, str(detail))
927        except Exception, detail:
928            raise error.UnhandledJobError(detail)
929
930
931    def _create_frame(self, global_vars, ancestry, fn_name):
932        """Set up the environment like it would have been when this
933        function was first defined.
934
935        Child step engine 'implementations' must have 'return locals()'
936        at end end of their steps.  Because of this, we can call the
937        parent function and get back all child functions (i.e. those
938        defined within it).
939
940        Unfortunately, the call stack of the function calling
941        job.next_step might have been deeper than the function it
942        added.  In order to make sure that the environment is what it
943        should be, we need to then pop off the frames we built until
944        we find the frame where the function was first defined."""
945
946        # The copies ensure that the parent frames are not modified
947        # while building child frames.  This matters if we then
948        # pop some frames in the next part of this function.
949        current_frame = copy.copy(global_vars)
950        frames = [current_frame]
951        for steps_fn_name in ancestry:
952            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})
953            current_frame = copy.copy(ret)
954            frames.append(current_frame)
955
956        # Walk up the stack frames until we find the place fn_name was defined.
957        while len(frames) > 2:
958            if fn_name not in frames[-2]:
959                break
960            if frames[-2][fn_name] != frames[-1][fn_name]:
961                break
962            frames.pop()
963            ancestry.pop()
964
965        return (frames[-1], ancestry)
966
967
968    def _add_step_init(self, local_vars, current_function):
969        """If the function returned a dictionary that includes a
970        function named 'step_init', prepend it to our list of steps.
971        This will only get run the first time a function with a nested
972        use of the step engine is run."""
973
974        if (isinstance(local_vars, dict) and
975            'step_init' in local_vars and
976            callable(local_vars['step_init'])):
977            # The init step is a child of the function
978            # we were just running.
979            self.current_step_ancestry.append(current_function)
980            self.next_step_prepend('step_init')
981
982
983    def step_engine(self):
984        """The multi-run engine used when the control file defines step_init.
985
986        Does the next step.
987        """
988
989        # Set up the environment and then interpret the control file.
990        # Some control files will have code outside of functions,
991        # which means we need to have our state engine initialized
992        # before reading in the file.
993        global_control_vars = {'job': self}
994        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)
995        try:
996            execfile(self.control, global_control_vars, global_control_vars)
997        except error.TestNAError, detail:
998            self.record(detail.exit_status, None, self.control, str(detail))
999        except SystemExit:
1000            raise  # Send error.JobContinue and JobComplete on up to runjob.
1001        except Exception, detail:
1002            # Syntax errors or other general Python exceptions coming out of
1003            # the top level of the control file itself go through here.
1004            raise error.UnhandledJobError(detail)
1005
1006        # If we loaded in a mid-job state file, then we presumably
1007        # know what steps we have yet to run.
1008        if not self._is_continuation:
1009            if 'step_init' in global_control_vars:
1010                self.next_step(global_control_vars['step_init'])
1011
1012        # Iterate through the steps.  If we reboot, we'll simply
1013        # continue iterating on the next step.
1014        while len(self.get_state('__steps')) > 0:
1015            steps = self.get_state('__steps')
1016            (ancestry, fn_name, args, dargs) = steps.pop(0)
1017            self.set_state('__steps', steps)
1018
1019            self.next_step_index = 0
1020            ret = self._create_frame(global_control_vars, ancestry, fn_name)
1021            local_vars, self.current_step_ancestry = ret
1022            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)
1023            self._add_step_init(local_vars, fn_name)
1024
1025
1026    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1027        self._add_sysinfo_loggable(sysinfo.command(command, logfile),
1028                                   on_every_test)
1029
1030
1031    def add_sysinfo_logfile(self, file, on_every_test=False):
1032        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1033
1034
1035    def _add_sysinfo_loggable(self, loggable, on_every_test):
1036        if on_every_test:
1037            self.sysinfo.test_loggables.add(loggable)
1038        else:
1039            self.sysinfo.boot_loggables.add(loggable)
1040        self._save_sysinfo_state()
1041
1042
1043    def _load_sysinfo_state(self):
1044        state = self.get_state("__sysinfo", None)
1045        if state:
1046            self.sysinfo.deserialize(state)
1047
1048
1049    def _save_sysinfo_state(self):
1050        state = self.sysinfo.serialize()
1051        self.set_state("__sysinfo", state)
1052
1053
1054    def _init_group_level(self):
1055        self.group_level = self.get_state("__group_level", default=0)
1056
1057
1058    def _increment_group_level(self):
1059        self.group_level += 1
1060        self.set_state("__group_level", self.group_level)
1061
1062
1063    def _decrement_group_level(self):
1064        self.group_level -= 1
1065        self.set_state("__group_level", self.group_level)
1066
1067
1068    def record(self, status_code, subdir, operation, status = '',
1069               optional_fields=None):
1070        """
1071        Record job-level status
1072
1073        The intent is to make this file both machine parseable and
1074        human readable. That involves a little more complexity, but
1075        really isn't all that bad ;-)
1076
1077        Format is <status code>\t<subdir>\t<operation>\t<status>
1078
1079        status code: (GOOD|WARN|FAIL|ABORT)
1080                or   START
1081                or   END (GOOD|WARN|FAIL|ABORT)
1082
1083        subdir: MUST be a relevant subdirectory in the results,
1084        or None, which will be represented as '----'
1085
1086        operation: description of what you ran (e.g. "dbench", or
1087                                        "mkfs -t foobar /dev/sda9")
1088
1089        status: error message or "completed sucessfully"
1090
1091        ------------------------------------------------------------
1092
1093        Initial tabs indicate indent levels for grouping, and is
1094        governed by self.group_level
1095
1096        multiline messages have secondary lines prefaced by a double
1097        space ('  ')
1098        """
1099
1100        if subdir:
1101            if re.match(r'[\n\t]', subdir):
1102                raise ValueError("Invalid character in subdir string")
1103            substr = subdir
1104        else:
1105            substr = '----'
1106
1107        if not log.is_valid_status(status_code):
1108            raise ValueError("Invalid status code supplied: %s" % status_code)
1109        if not operation:
1110            operation = '----'
1111
1112        if re.match(r'[\n\t]', operation):
1113            raise ValueError("Invalid character in operation string")
1114        operation = operation.rstrip()
1115
1116        if not optional_fields:
1117            optional_fields = {}
1118
1119        status = status.rstrip()
1120        status = re.sub(r"\t", "  ", status)
1121        # Ensure any continuation lines are marked so we can
1122        # detect them in the status file to ensure it is parsable.
1123        status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
1124
1125        # Generate timestamps for inclusion in the logs
1126        epoch_time = int(time.time())  # seconds since epoch, in UTC
1127        local_time = time.localtime(epoch_time)
1128        optional_fields["timestamp"] = str(epoch_time)
1129        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
1130                                                     local_time)
1131
1132        fields = [status_code, substr, operation]
1133        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
1134        fields.append(status)
1135
1136        msg = '\t'.join(str(x) for x in fields)
1137        msg = '\t' * self.group_level + msg
1138
1139        msg_tag = ""
1140        if "." in self.log_filename:
1141            msg_tag = self.log_filename.split(".", 1)[1]
1142
1143        self.harness.test_status_detail(status_code, substr, operation, status,
1144                                        msg_tag)
1145        self.harness.test_status(msg, msg_tag)
1146
1147        # log to stdout (if enabled)
1148        #if self.log_filename == self.DEFAULT_LOG_FILENAME:
1149        print msg
1150
1151        # log to the "root" status log
1152        status_file = os.path.join(self.resultdir, self.log_filename)
1153        open(status_file, "a").write(msg + "\n")
1154
1155        # log to the subdir status log (if subdir is set)
1156        if subdir:
1157            dir = os.path.join(self.resultdir, subdir)
1158            status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME)
1159            open(status_file, "a").write(msg + "\n")
1160
1161
1162class disk_usage_monitor:
1163    def __init__(self, logging_func, device, max_mb_per_hour):
1164        self.func = logging_func
1165        self.device = device
1166        self.max_mb_per_hour = max_mb_per_hour
1167
1168
1169    def start(self):
1170        self.initial_space = autotest_utils.freespace(self.device)
1171        self.start_time = time.time()
1172
1173
1174    def stop(self):
1175        # if no maximum usage rate was set, we don't need to
1176        # generate any warnings
1177        if not self.max_mb_per_hour:
1178            return
1179
1180        final_space = autotest_utils.freespace(self.device)
1181        used_space = self.initial_space - final_space
1182        stop_time = time.time()
1183        total_time = stop_time - self.start_time
1184        # round up the time to one minute, to keep extremely short
1185        # tests from generating false positives due to short, badly
1186        # timed bursts of activity
1187        total_time = max(total_time, 60.0)
1188
1189        # determine the usage rate
1190        bytes_per_sec = used_space / total_time
1191        mb_per_sec = bytes_per_sec / 1024**2
1192        mb_per_hour = mb_per_sec * 60 * 60
1193
1194        if mb_per_hour > self.max_mb_per_hour:
1195            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")
1196            msg %= (self.device, mb_per_hour)
1197            self.func(msg)
1198
1199
1200    @classmethod
1201    def watch(cls, *monitor_args, **monitor_dargs):
1202        """ Generic decorator to wrap a function call with the
1203        standard create-monitor -> start -> call -> stop idiom."""
1204        def decorator(func):
1205            def watched_func(*args, **dargs):
1206                monitor = cls(*monitor_args, **monitor_dargs)
1207                monitor.start()
1208                try:
1209                    func(*args, **dargs)
1210                finally:
1211                    monitor.stop()
1212            return watched_func
1213        return decorator
1214
1215
1216def runjob(control, cont=False, tag="default", harness_type='',
1217           use_external_logging=False):
1218    """
1219    Run a job using the given control file.
1220
1221    This is the main interface to this module.
1222
1223    Args:
1224        control: The control file to use for this job.
1225        cont: Whether this is the continuation of a previously started job.
1226        tag: The job tag string.  ['default']
1227        harness_type: An alternative server harness.  [None]
1228        use_external_logging: Should external logging be enabled?  [False]
1229    """
1230    control = os.path.abspath(control)
1231    state = control + '.state'
1232
1233    # instantiate the job object ready for the control file.
1234    myjob = None
1235    try:
1236        # Check that the control file is valid
1237        if not os.path.exists(control):
1238            raise error.JobError(control + ": control file not found")
1239
1240        # When continuing, the job is complete when there is no
1241        # state file, ensure we don't try and continue.
1242        if cont and not os.path.exists(state):
1243            raise error.JobComplete("all done")
1244
1245        myjob = job(control, tag, cont, harness_type=harness_type,
1246                    use_external_logging=use_external_logging)
1247
1248        # Load in the users control file, may do any one of:
1249        #  1) execute in toto
1250        #  2) define steps, and select the first via next_step()
1251        myjob.step_engine()
1252
1253    except error.JobContinue:
1254        sys.exit(5)
1255
1256    except error.JobComplete:
1257        sys.exit(1)
1258
1259    except error.JobError, instance:
1260        print "JOB ERROR: " + instance.args[0]
1261        if myjob:
1262            command = None
1263            if len(instance.args) > 1:
1264                command = instance.args[1]
1265                myjob.record('ABORT', None, command, instance.args[0])
1266            myjob._decrement_group_level()
1267            myjob.record('END ABORT', None, None, instance.args[0])
1268            assert (myjob.group_level == 0), ('myjob.group_level must be 0,'
1269                                              ' not %d' % myjob.group_level)
1270            myjob.complete(1)
1271        else:
1272            sys.exit(1)
1273
1274    except Exception, e:
1275        # NOTE: job._run_step_fn and job.step_engine will turn things into
1276        # a JobError for us.  If we get here, its likely an autotest bug.
1277        msg = str(e) + '\n' + traceback.format_exc()
1278        print "JOB ERROR (autotest bug?): " + msg
1279        if myjob:
1280            myjob._decrement_group_level()
1281            myjob.record('END ABORT', None, None, msg)
1282            assert(myjob.group_level == 0)
1283            myjob.complete(1)
1284        else:
1285            sys.exit(1)
1286
1287    # If we get here, then we assume the job is complete and good.
1288    myjob._decrement_group_level()
1289    myjob.record('END GOOD', None, None)
1290    assert(myjob.group_level == 0)
1291
1292    myjob.complete(0)
1293
1294
1295# site_job.py may be non-existant or empty, make sure that an appropriate
1296# site_job class is created nevertheless
1297try:
1298    from autotest_lib.client.bin.site_job import site_job
1299except ImportError:
1300    class site_job(object):
1301        pass
1302
1303class job(site_job, base_job):
1304    pass
1305