server_job.py revision 25c0b8cb56358f22dccf7fdc32dd1662787dc9ca
1"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
10import shutil, warnings
11from autotest_lib.client.bin import fd_stack, sysinfo
12from autotest_lib.client.common_lib import error, log, utils, packages
13from autotest_lib.server import test, subcommand, profilers
14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
15
16
17def _control_segment_path(name):
18    """Get the pathname of the named control segment file."""
19    server_dir = os.path.dirname(os.path.abspath(__file__))
20    return os.path.join(server_dir, "control_segments", name)
21
22
23CLIENT_CONTROL_FILENAME = 'control'
24SERVER_CONTROL_FILENAME = 'control.srv'
25MACHINES_FILENAME = '.machines'
26
27CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
30INSTALL_CONTROL_FILE = _control_segment_path('install')
31CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
32
33VERIFY_CONTROL_FILE = _control_segment_path('verify')
34REPAIR_CONTROL_FILE = _control_segment_path('repair')
35
36
37# by default provide a stub that generates no site data
38def _get_site_job_data_dummy(job):
39    return {}
40
41
42# load up site-specific code for generating site-specific job data
43get_site_job_data = utils.import_site_function(__file__,
44    "autotest_lib.server.site_job", "get_site_job_data",
45    _get_site_job_data_dummy)
46
47
48class base_server_job(object):
49    """
50    The actual job against which we do everything.
51
52    Properties:
53            autodir
54                    The top level autotest directory (/usr/local/autotest).
55            serverdir
56                    <autodir>/server/
57            clientdir
58                    <autodir>/client/
59            conmuxdir
60                    <autodir>/conmux/
61            testdir
62                    <autodir>/server/tests/
63            site_testdir
64                    <autodir>/server/site_tests/
65            control
66                    the control file for this job
67            drop_caches_between_iterations
68                    drop the pagecache between each iteration
69    """
70
71    STATUS_VERSION = 1
72
73
74    def __init__(self, control, args, resultdir, label, user, machines,
75                 client=False, parse_job='',
76                 ssh_user='root', ssh_port=22, ssh_pass=''):
77        """
78            Server side job object.
79
80            Parameters:
81                control:        The control file (pathname of)
82                args:           args to pass to the control file
83                resultdir:      where to throw the results
84                label:          label for the job
85                user:           Username for the job (email address)
86                client:         True if a client-side control file
87        """
88        path = os.path.dirname(__file__)
89        self.autodir = os.path.abspath(os.path.join(path, '..'))
90        self.serverdir = os.path.join(self.autodir, 'server')
91        self.testdir   = os.path.join(self.serverdir, 'tests')
92        self.site_testdir = os.path.join(self.serverdir, 'site_tests')
93        self.tmpdir    = os.path.join(self.serverdir, 'tmp')
94        self.conmuxdir = os.path.join(self.autodir, 'conmux')
95        self.clientdir = os.path.join(self.autodir, 'client')
96        self.toolsdir = os.path.join(self.autodir, 'client/tools')
97        if control:
98            self.control = open(control, 'r').read()
99            self.control = re.sub('\r', '', self.control)
100        else:
101            self.control = ''
102        self.resultdir = resultdir
103        if resultdir:
104            if not os.path.exists(resultdir):
105                os.mkdir(resultdir)
106            self.debugdir = os.path.join(resultdir, 'debug')
107            if not os.path.exists(self.debugdir):
108                os.mkdir(self.debugdir)
109            self.status = os.path.join(resultdir, 'status')
110        else:
111            self.status = None
112        self.label = label
113        self.user = user
114        self.args = args
115        self.machines = machines
116        self.client = client
117        self.record_prefix = ''
118        self.warning_loggers = set()
119        self.ssh_user = ssh_user
120        self.ssh_port = ssh_port
121        self.ssh_pass = ssh_pass
122        self.run_test_cleanup = True
123        self.last_boot_tag = None
124        self.hosts = set()
125        self.drop_caches_between_iterations = False
126
127        self.stdout = fd_stack.fd_stack(1, sys.stdout)
128        self.stderr = fd_stack.fd_stack(2, sys.stderr)
129
130        if resultdir:
131            self.sysinfo = sysinfo.sysinfo(self.resultdir)
132        self.profilers = profilers.profilers(self)
133
134        if not os.access(self.tmpdir, os.W_OK):
135            try:
136                os.makedirs(self.tmpdir, 0700)
137            except os.error, e:
138                # Thrown if the directory already exists, which it may.
139                pass
140
141        if not (os.access(self.tmpdir, os.W_OK) and os.path.isdir(self.tmpdir)):
142            self.tmpdir = os.path.join(tempfile.gettempdir(),
143                                       'autotest-' + getpass.getuser())
144            try:
145                os.makedirs(self.tmpdir, 0700)
146            except os.error, e:
147                # Thrown if the directory already exists, which it may.
148                # If the problem was something other than the
149                # directory already existing, this chmod should throw as well
150                # exception.
151                os.chmod(self.tmpdir, stat.S_IRWXU)
152
153        if self.status and os.path.exists(self.status):
154            os.unlink(self.status)
155        job_data = {'label' : label, 'user' : user,
156                    'hostname' : ','.join(machines),
157                    'status_version' : str(self.STATUS_VERSION),
158                    'job_started' : str(int(time.time()))}
159        if self.resultdir:
160            job_data.update(get_site_job_data(self))
161            utils.write_keyval(self.resultdir, job_data)
162
163        self.parse_job = parse_job
164        if self.parse_job and len(machines) == 1:
165            self.using_parser = True
166            self.init_parser(resultdir)
167        else:
168            self.using_parser = False
169        self.pkgmgr = packages.PackageManager(self.autodir,
170                                             run_function_dargs={'timeout':600})
171        self.pkgdir = os.path.join(self.autodir, 'packages')
172
173        self.num_tests_run = 0
174        self.num_tests_failed = 0
175
176        self._register_subcommand_hooks()
177
178
179    def _register_subcommand_hooks(self):
180        """
181        Register some hooks into the subcommand modules that allow us
182        to properly clean up self.hosts created in forked subprocesses.
183        """
184        def on_fork(cmd):
185            self._existing_hosts_on_fork = set(self.hosts)
186        def on_join(cmd):
187            new_hosts = self.hosts - self._existing_hosts_on_fork
188            for host in new_hosts:
189                host.close()
190        subcommand.subcommand.register_fork_hook(on_fork)
191        subcommand.subcommand.register_join_hook(on_join)
192
193
194    def init_parser(self, resultdir):
195        """
196        Start the continuous parsing of resultdir. This sets up
197        the database connection and inserts the basic job object into
198        the database if necessary.
199        """
200        # redirect parser debugging to .parse.log
201        parse_log = os.path.join(resultdir, '.parse.log')
202        parse_log = open(parse_log, 'w', 0)
203        tko_utils.redirect_parser_debugging(parse_log)
204        # create a job model object and set up the db
205        self.results_db = tko_db.db(autocommit=True)
206        self.parser = status_lib.parser(self.STATUS_VERSION)
207        self.job_model = self.parser.make_job(resultdir)
208        self.parser.start(self.job_model)
209        # check if a job already exists in the db and insert it if
210        # it does not
211        job_idx = self.results_db.find_job(self.parse_job)
212        if job_idx is None:
213            self.results_db.insert_job(self.parse_job, self.job_model)
214        else:
215            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
216            self.job_model.index = job_idx
217            self.job_model.machine_idx = machine_idx
218
219
220    def cleanup_parser(self):
221        """
222        This should be called after the server job is finished
223        to carry out any remaining cleanup (e.g. flushing any
224        remaining test results to the results db)
225        """
226        if not self.using_parser:
227            return
228        final_tests = self.parser.end()
229        for test in final_tests:
230            self.__insert_test(test)
231        self.using_parser = False
232
233
234    def verify(self):
235        if not self.machines:
236            raise error.AutoservError('No machines specified to verify')
237        if self.resultdir:
238            os.chdir(self.resultdir)
239        try:
240            namespace = {'machines' : self.machines, 'job' : self,
241                         'ssh_user' : self.ssh_user,
242                         'ssh_port' : self.ssh_port,
243                         'ssh_pass' : self.ssh_pass}
244            self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
245        except Exception, e:
246            msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
247            self.record('ABORT', None, None, msg)
248            raise
249
250
251    def repair(self, host_protection):
252        if not self.machines:
253            raise error.AutoservError('No machines specified to repair')
254        if self.resultdir:
255            os.chdir(self.resultdir)
256        namespace = {'machines': self.machines, 'job': self,
257                     'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
258                     'ssh_pass': self.ssh_pass,
259                     'protection_level': host_protection}
260        # no matter what happens during repair (except if it succeeded in
261        # initiating hardware repair procedure), go on to try to reverify
262        try:
263            self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
264        except error.AutoservHardwareRepairRequestedError:
265            raise
266        except Exception, exc:
267            print 'Exception occured during repair'
268            traceback.print_exc()
269
270        self.verify()
271
272
273    def precheck(self):
274        """
275        perform any additional checks in derived classes.
276        """
277        pass
278
279
280    def enable_external_logging(self):
281        """
282        Start or restart external logging mechanism.
283        """
284        pass
285
286
287    def disable_external_logging(self):
288        """
289        Pause or stop external logging mechanism.
290        """
291        pass
292
293
294    def enable_test_cleanup(self):
295        """
296        By default tests run test.cleanup
297        """
298        self.run_test_cleanup = True
299
300
301    def disable_test_cleanup(self):
302        """
303        By default tests do not run test.cleanup
304        """
305        self.run_test_cleanup = False
306
307
308    def use_external_logging(self):
309        """
310        Return True if external logging should be used.
311        """
312        return False
313
314
315    def parallel_simple(self, function, machines, log=True, timeout=None):
316        """
317        Run 'function' using parallel_simple, with an extra wrapper to handle
318        the necessary setup for continuous parsing, if possible. If continuous
319        parsing is already properly initialized then this should just work.
320        """
321        is_forking = not (len(machines) == 1 and self.machines == machines)
322        if self.parse_job and is_forking and log:
323            def wrapper(machine):
324                self.parse_job += "/" + machine
325                self.using_parser = True
326                self.machines = [machine]
327                self.resultdir = os.path.join(self.resultdir, machine)
328                os.chdir(self.resultdir)
329                utils.write_keyval(self.resultdir, {"hostname": machine})
330                self.init_parser(self.resultdir)
331                result = function(machine)
332                self.cleanup_parser()
333                return result
334        elif len(machines) > 1 and log:
335            def wrapper(machine):
336                self.resultdir = os.path.join(self.resultdir, machine)
337                os.chdir(self.resultdir)
338                result = function(machine)
339                return result
340        else:
341            wrapper = function
342        subcommand.parallel_simple(wrapper, machines, log, timeout)
343
344
345    def run(self, cleanup=False, install_before=False, install_after=False,
346            collect_crashdumps=True, namespace={}):
347        # use a copy so changes don't affect the original dictionary
348        namespace = namespace.copy()
349        machines = self.machines
350
351        self.aborted = False
352        namespace['machines'] = machines
353        namespace['args'] = self.args
354        namespace['job'] = self
355        namespace['ssh_user'] = self.ssh_user
356        namespace['ssh_port'] = self.ssh_port
357        namespace['ssh_pass'] = self.ssh_pass
358        test_start_time = int(time.time())
359
360        if self.resultdir:
361            os.chdir(self.resultdir)
362
363            self.enable_external_logging()
364            status_log = os.path.join(self.resultdir, 'status.log')
365        collect_crashinfo = True
366        temp_control_file_dir = None
367        try:
368            if install_before and machines:
369                self._execute_code(INSTALL_CONTROL_FILE, namespace)
370            if self.resultdir:
371                server_control_file = SERVER_CONTROL_FILENAME
372                client_control_file = CLIENT_CONTROL_FILENAME
373            else:
374                temp_control_file_dir = tempfile.mkdtemp()
375                server_control_file = os.path.join(temp_control_file_dir,
376                                                   SERVER_CONTROL_FILENAME)
377                client_control_file = os.path.join(temp_control_file_dir,
378                                                   CLIENT_CONTROL_FILENAME)
379            if self.client:
380                namespace['control'] = self.control
381                utils.open_write_close(client_control_file, self.control)
382                shutil.copy(CLIENT_WRAPPER_CONTROL_FILE, server_control_file)
383            else:
384                namespace['utils'] = utils
385                utils.open_write_close(server_control_file, self.control)
386            self._execute_code(server_control_file, namespace)
387
388            # disable crashinfo collection if we get this far without error
389            collect_crashinfo = False
390        finally:
391            if temp_control_file_dir:
392                # Clean up temp. directory used for copies of the control files.
393                try:
394                    shutil.rmtree(temp_control_file_dir)
395                except Exception, e:
396                    print 'Error', e, 'removing dir', temp_control_file_dir
397            if machines and (collect_crashdumps or collect_crashinfo):
398                namespace['test_start_time'] = test_start_time
399                if collect_crashinfo:
400                    # includes crashdumps
401                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
402                else:
403                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
404            self.disable_external_logging()
405            if cleanup and machines:
406                self._execute_code(CLEANUP_CONTROL_FILE, namespace)
407            if install_after and machines:
408                self._execute_code(INSTALL_CONTROL_FILE, namespace)
409
410
411    def run_test(self, url, *args, **dargs):
412        """
413        Summon a test object and run it.
414
415        tag
416                tag to add to testname
417        url
418                url of the test to run
419        """
420
421        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
422
423        tag = dargs.pop('tag', None)
424        if tag:
425            testname += '.' + str(tag)
426        subdir = testname
427
428        outputdir = os.path.join(self.resultdir, subdir)
429        if os.path.exists(outputdir):
430            msg = ("%s already exists, test <%s> may have"
431                   " already run with tag <%s>" % (outputdir, testname, tag))
432            raise error.TestError(msg)
433        os.mkdir(outputdir)
434
435        def group_func():
436            try:
437                test.runtest(self, url, tag, args, dargs)
438            except error.TestBaseException, e:
439                self.record(e.exit_status, subdir, testname, str(e))
440                raise
441            except Exception, e:
442                info = str(e) + "\n" + traceback.format_exc()
443                self.record('FAIL', subdir, testname, info)
444                raise
445            else:
446                self.record('GOOD', subdir, testname, 'completed successfully')
447
448        result, exc_info = self._run_group(testname, subdir, group_func)
449        if exc_info and isinstance(exc_info[1], error.TestBaseException):
450            return False
451        elif exc_info:
452            raise exc_info[0], exc_info[1], exc_info[2]
453        else:
454            return True
455
456
457    def _run_group(self, name, subdir, function, *args, **dargs):
458        """\
459        Underlying method for running something inside of a group.
460        """
461        result, exc_info = None, None
462        old_record_prefix = self.record_prefix
463        try:
464            self.record('START', subdir, name)
465            self.record_prefix += '\t'
466            try:
467                result = function(*args, **dargs)
468            finally:
469                self.record_prefix = old_record_prefix
470        except error.TestBaseException, e:
471            self.record("END %s" % e.exit_status, subdir, name)
472            exc_info = sys.exc_info()
473        except Exception, e:
474            err_msg = str(e) + '\n'
475            err_msg += traceback.format_exc()
476            self.record('END ABORT', subdir, name, err_msg)
477            raise error.JobError(name + ' failed\n' + traceback.format_exc())
478        else:
479            self.record('END GOOD', subdir, name)
480
481        return result, exc_info
482
483
484    def run_group(self, function, *args, **dargs):
485        """\
486        function:
487                subroutine to run
488        *args:
489                arguments for the function
490        """
491
492        name = function.__name__
493
494        # Allow the tag for the group to be specified.
495        tag = dargs.pop('tag', None)
496        if tag:
497            name = tag
498
499        return self._run_group(name, None, function, *args, **dargs)[0]
500
501
502    def run_reboot(self, reboot_func, get_kernel_func):
503        """\
504        A specialization of run_group meant specifically for handling
505        a reboot. Includes support for capturing the kernel version
506        after the reboot.
507
508        reboot_func: a function that carries out the reboot
509
510        get_kernel_func: a function that returns a string
511        representing the kernel version.
512        """
513
514        old_record_prefix = self.record_prefix
515        try:
516            self.record('START', None, 'reboot')
517            self.record_prefix += '\t'
518            reboot_func()
519        except Exception, e:
520            self.record_prefix = old_record_prefix
521            err_msg = str(e) + '\n' + traceback.format_exc()
522            self.record('END FAIL', None, 'reboot', err_msg)
523        else:
524            kernel = get_kernel_func()
525            self.record_prefix = old_record_prefix
526            self.record('END GOOD', None, 'reboot',
527                        optional_fields={"kernel": kernel})
528
529
530    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
531        self._add_sysinfo_loggable(sysinfo.command(command, logfile),
532                                   on_every_test)
533
534
535    def add_sysinfo_logfile(self, file, on_every_test=False):
536        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
537
538
539    def _add_sysinfo_loggable(self, loggable, on_every_test):
540        if on_every_test:
541            self.sysinfo.test_loggables.add(loggable)
542        else:
543            self.sysinfo.boot_loggables.add(loggable)
544
545
546    def record(self, status_code, subdir, operation, status='',
547               optional_fields=None):
548        """
549        Record job-level status
550
551        The intent is to make this file both machine parseable and
552        human readable. That involves a little more complexity, but
553        really isn't all that bad ;-)
554
555        Format is <status code>\t<subdir>\t<operation>\t<status>
556
557        status code: see common_lib.log.is_valid_status()
558                     for valid status definition
559
560        subdir: MUST be a relevant subdirectory in the results,
561        or None, which will be represented as '----'
562
563        operation: description of what you ran (e.g. "dbench", or
564                                        "mkfs -t foobar /dev/sda9")
565
566        status: error message or "completed sucessfully"
567
568        ------------------------------------------------------------
569
570        Initial tabs indicate indent levels for grouping, and is
571        governed by self.record_prefix
572
573        multiline messages have secondary lines prefaced by a double
574        space ('  ')
575
576        Executing this method will trigger the logging of all new
577        warnings to date from the various console loggers.
578        """
579        # poll all our warning loggers for new warnings
580        warnings = self._read_warnings()
581        for timestamp, msg in warnings:
582            self._record("WARN", None, None, msg, timestamp)
583
584        # write out the actual status log line
585        self._record(status_code, subdir, operation, status,
586                      optional_fields=optional_fields)
587
588
589    def _read_warnings(self):
590        warnings = []
591        while True:
592            # pull in a line of output from every logger that has
593            # output ready to be read
594            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
595            closed_loggers = set()
596            for logger in loggers:
597                line = logger.readline()
598                # record any broken pipes (aka line == empty)
599                if len(line) == 0:
600                    closed_loggers.add(logger)
601                    continue
602                timestamp, msg = line.split('\t', 1)
603                warnings.append((int(timestamp), msg.strip()))
604
605            # stop listening to loggers that are closed
606            self.warning_loggers -= closed_loggers
607
608            # stop if none of the loggers have any output left
609            if not loggers:
610                break
611
612        # sort into timestamp order
613        warnings.sort()
614        return warnings
615
616
617    def _render_record(self, status_code, subdir, operation, status='',
618                       epoch_time=None, record_prefix=None,
619                       optional_fields=None):
620        """
621        Internal Function to generate a record to be written into a
622        status log. For use by server_job.* classes only.
623        """
624        if subdir:
625            if re.match(r'[\n\t]', subdir):
626                raise ValueError('Invalid character in subdir string')
627            substr = subdir
628        else:
629            substr = '----'
630
631        if not log.is_valid_status(status_code):
632            raise ValueError('Invalid status code supplied: %s' % status_code)
633        if not operation:
634            operation = '----'
635        if re.match(r'[\n\t]', operation):
636            raise ValueError('Invalid character in operation string')
637        operation = operation.rstrip()
638        status = status.rstrip()
639        status = re.sub(r"\t", "  ", status)
640        # Ensure any continuation lines are marked so we can
641        # detect them in the status file to ensure it is parsable.
642        status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
643
644        if not optional_fields:
645            optional_fields = {}
646
647        # Generate timestamps for inclusion in the logs
648        if epoch_time is None:
649            epoch_time = int(time.time())
650        local_time = time.localtime(epoch_time)
651        optional_fields["timestamp"] = str(epoch_time)
652        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
653                                                     local_time)
654
655        fields = [status_code, substr, operation]
656        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
657        fields.append(status)
658
659        if record_prefix is None:
660            record_prefix = self.record_prefix
661
662        msg = '\t'.join(str(x) for x in fields)
663        return record_prefix + msg + '\n'
664
665
666    def _record_prerendered(self, msg):
667        """
668        Record a pre-rendered msg into the status logs. The only
669        change this makes to the message is to add on the local
670        indentation. Should not be called outside of server_job.*
671        classes. Unlike _record, this does not write the message
672        to standard output.
673        """
674        lines = []
675        status_file = os.path.join(self.resultdir, 'status.log')
676        status_log = open(status_file, 'a')
677        for line in msg.splitlines():
678            line = self.record_prefix + line + '\n'
679            lines.append(line)
680            status_log.write(line)
681        status_log.close()
682        self.__parse_status(lines)
683
684
685    def _fill_server_control_namespace(self, namespace, protect=True):
686        """
687        Prepare a namespace to be used when executing server control files.
688
689        This sets up the control file API by importing modules and making them
690        available under the appropriate names within namespace.
691
692        For use by _execute_code().
693
694        Args:
695          namespace: The namespace dictionary to fill in.
696          protect: Boolean.  If True (the default) any operation that would
697              clobber an existing entry in namespace will cause an error.
698        Raises:
699          error.AutoservError: When a name would be clobbered by import.
700        """
701        def _import_names(module_name, names=()):
702            """
703            Import a module and assign named attributes into namespace.
704
705            Args:
706                module_name: The string module name.
707                names: A limiting list of names to import from module_name.  If
708                    empty (the default), all names are imported from the module
709                    similar to a "from foo.bar import *" statement.
710            Raises:
711                error.AutoservError: When a name being imported would clobber
712                    a name already in namespace.
713            """
714            module = __import__(module_name, {}, {}, names)
715
716            # No names supplied?  Import * from the lowest level module.
717            # (Ugh, why do I have to implement this part myself?)
718            if not names:
719                for submodule_name in module_name.split('.')[1:]:
720                    module = getattr(module, submodule_name)
721                if hasattr(module, '__all__'):
722                    names = getattr(module, '__all__')
723                else:
724                    names = dir(module)
725
726            # Install each name into namespace, checking to make sure it
727            # doesn't override anything that already exists.
728            for name in names:
729                # Check for conflicts to help prevent future problems.
730                if name in namespace and protect:
731                    if namespace[name] is not getattr(module, name):
732                        raise error.AutoservError('importing name '
733                                '%s from %s %r would override %r' %
734                                (name, module_name, getattr(module, name),
735                                 namespace[name]))
736                    else:
737                        # Encourage cleanliness and the use of __all__ for a
738                        # more concrete API with less surprises on '*' imports.
739                        warnings.warn('%s (%r) being imported from %s for use '
740                                      'in server control files is not the '
741                                      'first occurrance of that import.' %
742                                      (name, namespace[name], module_name))
743
744                namespace[name] = getattr(module, name)
745
746
747        # This is the equivalent of prepending a bunch of import statements to
748        # the front of the control script.
749        namespace.update(os=os, sys=sys)
750        _import_names('autotest_lib.server',
751                ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
752                 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
753        _import_names('autotest_lib.server.subcommand',
754                      ('parallel', 'parallel_simple', 'subcommand'))
755        _import_names('autotest_lib.server.utils',
756                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
757        _import_names('autotest_lib.client.common_lib.error')
758        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
759
760        # Inject ourself as the job object into other classes within the API.
761        # (Yuck, this injection is a gross thing be part of a public API. -gps)
762        #
763        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
764        namespace['autotest'].Autotest.job = self
765        # server.hosts.base_classes.Host uses .job.
766        namespace['hosts'].Host.job = self
767
768
769    def _execute_code(self, code_file, namespace, protect=True):
770        """
771        Execute code using a copy of namespace as a server control script.
772
773        Unless protect_namespace is explicitly set to False, the dict will not
774        be modified.
775
776        Args:
777          code_file: The filename of the control file to execute.
778          namespace: A dict containing names to make available during execution.
779          protect: Boolean.  If True (the default) a copy of the namespace dict
780              is used during execution to prevent the code from modifying its
781              contents outside of this function.  If False the raw dict is
782              passed in and modifications will be allowed.
783        """
784        if protect:
785            namespace = namespace.copy()
786        self._fill_server_control_namespace(namespace, protect=protect)
787        # TODO: Simplify and get rid of the special cases for only 1 machine.
788        if len(self.machines) > 1:
789            machines_text = '\n'.join(self.machines) + '\n'
790            # Only rewrite the file if it does not match our machine list.
791            try:
792                machines_f = open(MACHINES_FILENAME, 'r')
793                existing_machines_text = machines_f.read()
794                machines_f.close()
795            except EnvironmentError:
796                existing_machines_text = None
797            if machines_text != existing_machines_text:
798                utils.open_write_close(MACHINES_FILENAME, machines_text)
799        execfile(code_file, namespace, namespace)
800
801
802    def _record(self, status_code, subdir, operation, status='',
803                 epoch_time=None, optional_fields=None):
804        """
805        Actual function for recording a single line into the status
806        logs. Should never be called directly, only by job.record as
807        this would bypass the console monitor logging.
808        """
809
810        msg = self._render_record(status_code, subdir, operation, status,
811                                  epoch_time, optional_fields=optional_fields)
812
813
814        status_file = os.path.join(self.resultdir, 'status.log')
815        sys.stdout.write(msg)
816        open(status_file, "a").write(msg)
817        if subdir:
818            test_dir = os.path.join(self.resultdir, subdir)
819            status_file = os.path.join(test_dir, 'status.log')
820            open(status_file, "a").write(msg)
821        self.__parse_status(msg.splitlines())
822
823
824    def __parse_status(self, new_lines):
825        if not self.using_parser:
826            return
827        new_tests = self.parser.process_lines(new_lines)
828        for test in new_tests:
829            self.__insert_test(test)
830
831
832    def __insert_test(self, test):
833        """
834        An internal method to insert a new test result into the
835        database. This method will not raise an exception, even if an
836        error occurs during the insert, to avoid failing a test
837        simply because of unexpected database issues."""
838        self.num_tests_run += 1
839        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
840            self.num_tests_failed += 1
841        try:
842            self.results_db.insert_test(self.job_model, test)
843        except Exception:
844            msg = ("WARNING: An unexpected error occured while "
845                   "inserting test results into the database. "
846                   "Ignoring error.\n" + traceback.format_exc())
847            print >> sys.stderr, msg
848
849
850site_server_job = utils.import_site_class(
851    __file__, "autotest_lib.server.site_server_job", "site_server_job",
852    base_server_job)
853
854class server_job(site_server_job, base_server_job):
855    pass
856