server_job.py revision cf8d492a6eafdd03d88ad9dad676ae737ec898ec
1"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9import getpass, os, sys, re, stat, tempfile, time, select, subprocess
10import traceback, shutil, warnings, fcntl, pickle, logging
11import itertools
12from autotest_lib.client.bin import sysinfo
13from autotest_lib.client.common_lib import error, log, utils, packages
14from autotest_lib.client.common_lib import logging_manager
15from autotest_lib.server import test, subcommand, profilers
16from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
17
18
19def _control_segment_path(name):
20    """Get the pathname of the named control segment file."""
21    server_dir = os.path.dirname(os.path.abspath(__file__))
22    return os.path.join(server_dir, "control_segments", name)
23
24
25CLIENT_CONTROL_FILENAME = 'control'
26SERVER_CONTROL_FILENAME = 'control.srv'
27MACHINES_FILENAME = '.machines'
28
29CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
30CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
31CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
32INSTALL_CONTROL_FILE = _control_segment_path('install')
33CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
34
35VERIFY_CONTROL_FILE = _control_segment_path('verify')
36REPAIR_CONTROL_FILE = _control_segment_path('repair')
37
38
39# by default provide a stub that generates no site data
40def _get_site_job_data_dummy(job):
41    return {}
42
43
44# load up site-specific code for generating site-specific job data
45get_site_job_data = utils.import_site_function(__file__,
46    "autotest_lib.server.site_server_job", "get_site_job_data",
47    _get_site_job_data_dummy)
48
49
50class base_server_job(object):
51    """
52    The actual job against which we do everything.
53
54    Properties:
55            autodir
56                    The top level autotest directory (/usr/local/autotest).
57            serverdir
58                    <autodir>/server/
59            clientdir
60                    <autodir>/client/
61            conmuxdir
62                    <autodir>/conmux/
63            testdir
64                    <autodir>/server/tests/
65            site_testdir
66                    <autodir>/server/site_tests/
67            control
68                    the control file for this job
69            drop_caches_between_iterations
70                    drop the pagecache between each iteration
71            default_profile_only
72                    default value for the test.execute profile_only parameter
73    """
74
75    STATUS_VERSION = 1
76
77    def __init__(self, control, args, resultdir, label, user, machines,
78                 client=False, parse_job='',
79                 ssh_user='root', ssh_port=22, ssh_pass='',
80                 group_name='', tag=''):
81        """
82        Create a server side job object.
83
84        @param control: The pathname of the control file.
85        @param args: Passed to the control file.
86        @param resultdir: Where to throw the results.
87        @param label: Description of the job.
88        @param user: Username for the job (email address).
89        @param client: True if this is a client-side control file.
90        @param parse_job: string, if supplied it is the job execution tag that
91                the results will be passed through to the TKO parser with.
92        @param ssh_user: The SSH username.  [root]
93        @param ssh_port: The SSH port number.  [22]
94        @param ssh_pass: The SSH passphrase, if needed.
95        @param group_name: If supplied, this will be written out as
96                host_group_name in the keyvals file for the parser.
97        @param tag: The job execution tag from the scheduler.  [optional]
98        """
99        path = os.path.dirname(__file__)
100        self.autodir = os.path.abspath(os.path.join(path, '..'))
101        self.serverdir = os.path.join(self.autodir, 'server')
102        self.testdir   = os.path.join(self.serverdir, 'tests')
103        self.site_testdir = os.path.join(self.serverdir, 'site_tests')
104        self.tmpdir    = os.path.join(self.serverdir, 'tmp')
105        self.conmuxdir = os.path.join(self.autodir, 'conmux')
106        self.clientdir = os.path.join(self.autodir, 'client')
107        self.toolsdir = os.path.join(self.autodir, 'client/tools')
108        if control:
109            self.control = self._load_control_file(control)
110        else:
111            self.control = ''
112        self.resultdir = resultdir
113        self.uncollected_log_file = None
114        if resultdir:
115            self.uncollected_log_file = os.path.join(resultdir,
116                                                     'uncollected_logs')
117            self.debugdir = os.path.join(resultdir, 'debug')
118
119            if not os.path.exists(resultdir):
120                os.mkdir(resultdir)
121            if not os.path.exists(self.debugdir):
122                os.mkdir(self.debugdir)
123        self.label = label
124        self.user = user
125        self.args = args
126        self.machines = machines
127        self.client = client
128        self.record_prefix = ''
129        self.warning_loggers = set()
130        self.warning_manager = warning_manager()
131        self.ssh_user = ssh_user
132        self.ssh_port = ssh_port
133        self.ssh_pass = ssh_pass
134        self.tag = tag
135        self.default_profile_only = False
136        self.run_test_cleanup = True
137        self.last_boot_tag = None
138        self.hosts = set()
139        self.drop_caches_between_iterations = False
140
141        self.logging = logging_manager.get_logging_manager(
142                manage_stdout_and_stderr=True, redirect_fds=True)
143        subcommand.logging_manager_object = self.logging
144
145        if resultdir:
146            self.sysinfo = sysinfo.sysinfo(self.resultdir)
147        self.profilers = profilers.profilers(self)
148
149        if not os.access(self.tmpdir, os.W_OK):
150            try:
151                os.makedirs(self.tmpdir, 0700)
152            except os.error, e:
153                # Thrown if the directory already exists, which it may.
154                pass
155
156        if not (os.access(self.tmpdir, os.W_OK) and os.path.isdir(self.tmpdir)):
157            self.tmpdir = os.path.join(tempfile.gettempdir(),
158                                       'autotest-' + getpass.getuser())
159            try:
160                os.makedirs(self.tmpdir, 0700)
161            except os.error, e:
162                # Thrown if the directory already exists, which it may.
163                # If the problem was something other than the
164                # directory already existing, this chmod should throw as well
165                # exception.
166                os.chmod(self.tmpdir, stat.S_IRWXU)
167
168        job_data = {'label' : label, 'user' : user,
169                    'hostname' : ','.join(machines),
170                    'status_version' : str(self.STATUS_VERSION),
171                    'job_started' : str(int(time.time()))}
172        if group_name:
173            job_data['host_group_name'] = group_name
174        if self.resultdir:
175            # only write these keyvals out on the first job in a resultdir
176            if 'job_started' not in utils.read_keyval(self.resultdir):
177                job_data.update(get_site_job_data(self))
178                utils.write_keyval(self.resultdir, job_data)
179
180        self.parse_job = parse_job
181        if self.parse_job and len(machines) == 1:
182            self.using_parser = True
183            self.init_parser(resultdir)
184        else:
185            self.using_parser = False
186        self.pkgmgr = packages.PackageManager(self.autodir,
187                                             run_function_dargs={'timeout':600})
188        self.pkgdir = os.path.join(self.autodir, 'packages')
189
190        self.num_tests_run = 0
191        self.num_tests_failed = 0
192
193        self._register_subcommand_hooks()
194        self._test_tag_prefix = None
195
196
197    @staticmethod
198    def _load_control_file(path):
199        f = open(path)
200        try:
201            control_file = f.read()
202        finally:
203            f.close()
204        return re.sub('\r', '', control_file)
205
206
207    def _register_subcommand_hooks(self):
208        """
209        Register some hooks into the subcommand modules that allow us
210        to properly clean up self.hosts created in forked subprocesses.
211        """
212        def on_fork(cmd):
213            self._existing_hosts_on_fork = set(self.hosts)
214        def on_join(cmd):
215            new_hosts = self.hosts - self._existing_hosts_on_fork
216            for host in new_hosts:
217                host.close()
218        subcommand.subcommand.register_fork_hook(on_fork)
219        subcommand.subcommand.register_join_hook(on_join)
220
221
222    def init_parser(self, resultdir):
223        """
224        Start the continuous parsing of resultdir. This sets up
225        the database connection and inserts the basic job object into
226        the database if necessary.
227        """
228        # redirect parser debugging to .parse.log
229        parse_log = os.path.join(resultdir, '.parse.log')
230        parse_log = open(parse_log, 'w', 0)
231        tko_utils.redirect_parser_debugging(parse_log)
232        # create a job model object and set up the db
233        self.results_db = tko_db.db(autocommit=True)
234        self.parser = status_lib.parser(self.STATUS_VERSION)
235        self.job_model = self.parser.make_job(resultdir)
236        self.parser.start(self.job_model)
237        # check if a job already exists in the db and insert it if
238        # it does not
239        job_idx = self.results_db.find_job(self.parse_job)
240        if job_idx is None:
241            self.results_db.insert_job(self.parse_job, self.job_model)
242        else:
243            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
244            self.job_model.index = job_idx
245            self.job_model.machine_idx = machine_idx
246
247
248    def cleanup_parser(self):
249        """
250        This should be called after the server job is finished
251        to carry out any remaining cleanup (e.g. flushing any
252        remaining test results to the results db)
253        """
254        if not self.using_parser:
255            return
256        final_tests = self.parser.end()
257        for test in final_tests:
258            self.__insert_test(test)
259        self.using_parser = False
260
261
262    def verify(self):
263        if not self.machines:
264            raise error.AutoservError('No machines specified to verify')
265        if self.resultdir:
266            os.chdir(self.resultdir)
267        try:
268            namespace = {'machines' : self.machines, 'job' : self,
269                         'ssh_user' : self.ssh_user,
270                         'ssh_port' : self.ssh_port,
271                         'ssh_pass' : self.ssh_pass}
272            self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
273        except Exception, e:
274            msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
275            self.record('ABORT', None, None, msg)
276            raise
277
278
279    def repair(self, host_protection):
280        if not self.machines:
281            raise error.AutoservError('No machines specified to repair')
282        if self.resultdir:
283            os.chdir(self.resultdir)
284        namespace = {'machines': self.machines, 'job': self,
285                     'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
286                     'ssh_pass': self.ssh_pass,
287                     'protection_level': host_protection}
288
289        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
290
291
292    def precheck(self):
293        """
294        perform any additional checks in derived classes.
295        """
296        pass
297
298
299    def enable_external_logging(self):
300        """
301        Start or restart external logging mechanism.
302        """
303        pass
304
305
306    def disable_external_logging(self):
307        """
308        Pause or stop external logging mechanism.
309        """
310        pass
311
312
313    def set_default_profile_only(self, val):
314        """ Set the default_profile_only mode. """
315        self.default_profile_only = val
316
317
318    def enable_test_cleanup(self):
319        """
320        By default tests run test.cleanup
321        """
322        self.run_test_cleanup = True
323
324
325    def disable_test_cleanup(self):
326        """
327        By default tests do not run test.cleanup
328        """
329        self.run_test_cleanup = False
330
331
332    def use_external_logging(self):
333        """
334        Return True if external logging should be used.
335        """
336        return False
337
338
339    def _make_parallel_wrapper(self, function, machines, log):
340        """Wrap function as appropriate for calling by parallel_simple."""
341        is_forking = not (len(machines) == 1 and self.machines == machines)
342        if self.parse_job and is_forking and log:
343            def wrapper(machine):
344                self.parse_job += "/" + machine
345                self.using_parser = True
346                self.machines = [machine]
347                self.resultdir = os.path.join(self.resultdir, machine)
348                os.chdir(self.resultdir)
349                utils.write_keyval(self.resultdir, {"hostname": machine})
350                self.init_parser(self.resultdir)
351                result = function(machine)
352                self.cleanup_parser()
353                return result
354        elif len(machines) > 1 and log:
355            def wrapper(machine):
356                self.resultdir = os.path.join(self.resultdir, machine)
357                os.chdir(self.resultdir)
358                machine_data = {'hostname' : machine,
359                                'status_version' : str(self.STATUS_VERSION)}
360                utils.write_keyval(self.resultdir, machine_data)
361                result = function(machine)
362                return result
363        else:
364            wrapper = function
365        return wrapper
366
367
368    def parallel_simple(self, function, machines, log=True, timeout=None,
369                        return_results=False):
370        """
371        Run 'function' using parallel_simple, with an extra wrapper to handle
372        the necessary setup for continuous parsing, if possible. If continuous
373        parsing is already properly initialized then this should just work.
374
375        @param function: A callable to run in parallel given each machine.
376        @param machines: A list of machine names to be passed one per subcommand
377                invocation of function.
378        @param log: If True, output will be written to output in a subdirectory
379                named after each machine.
380        @param timeout: Seconds after which the function call should timeout.
381        @param return_results: If True instead of an AutoServError being raised
382                on any error a list of the results|exceptions from the function
383                called on each arg is returned.  [default: False]
384
385        @raises error.AutotestError: If any of the functions failed.
386        """
387        wrapper = self._make_parallel_wrapper(function, machines, log)
388        return subcommand.parallel_simple(wrapper, machines,
389                                          log=log, timeout=timeout,
390                                          return_results=return_results)
391
392
393    def parallel_on_machines(self, function, machines, timeout=None):
394        """
395        @param function: Called in parallel with one machine as its argument.
396        @param machines: A list of machines to call function(machine) on.
397        @param timeout: Seconds after which the function call should timeout.
398
399        @returns A list of machines on which function(machine) returned
400                without raising an exception.
401        """
402        results = self.parallel_simple(function, machines, timeout=timeout,
403                                       return_results=True)
404        success_machines = []
405        for result, machine in itertools.izip(results, machines):
406            if not isinstance(result, Exception):
407                success_machines.append(machine)
408        return success_machines
409
410
411    USE_TEMP_DIR = object()
412    def run(self, cleanup=False, install_before=False, install_after=False,
413            collect_crashdumps=True, namespace={}, control=None,
414            control_file_dir=None, only_collect_crashinfo=False):
415        # for a normal job, make sure the uncollected logs file exists
416        # for a crashinfo-only run it should already exist, bail out otherwise
417        if self.resultdir and not os.path.exists(self.uncollected_log_file):
418            if only_collect_crashinfo:
419                # if this is a crashinfo-only run, and there were no existing
420                # uncollected logs, just bail out early
421                logging.info("No existing uncollected logs, "
422                             "skipping crashinfo collection")
423                return
424            else:
425                log_file = open(self.uncollected_log_file, "w")
426                pickle.dump([], log_file)
427                log_file.close()
428
429        # use a copy so changes don't affect the original dictionary
430        namespace = namespace.copy()
431        machines = self.machines
432        if control is None:
433            control = self.control
434        if control_file_dir is None:
435            control_file_dir = self.resultdir
436
437        self.aborted = False
438        namespace['machines'] = machines
439        namespace['args'] = self.args
440        namespace['job'] = self
441        namespace['ssh_user'] = self.ssh_user
442        namespace['ssh_port'] = self.ssh_port
443        namespace['ssh_pass'] = self.ssh_pass
444        test_start_time = int(time.time())
445
446        if self.resultdir:
447            os.chdir(self.resultdir)
448            # touch status.log so that the parser knows a job is running here
449            open(self.get_status_log_path(), 'a').close()
450            self.enable_external_logging()
451
452        collect_crashinfo = True
453        temp_control_file_dir = None
454        try:
455            try:
456                if install_before and machines:
457                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
458
459                if only_collect_crashinfo:
460                    return
461
462                # determine the dir to write the control files to
463                cfd_specified = (control_file_dir
464                                 and control_file_dir is not self.USE_TEMP_DIR)
465                if cfd_specified:
466                    temp_control_file_dir = None
467                else:
468                    temp_control_file_dir = tempfile.mkdtemp(
469                        suffix='temp_control_file_dir')
470                    control_file_dir = temp_control_file_dir
471                server_control_file = os.path.join(control_file_dir,
472                                                   SERVER_CONTROL_FILENAME)
473                client_control_file = os.path.join(control_file_dir,
474                                                   CLIENT_CONTROL_FILENAME)
475                if self.client:
476                    namespace['control'] = control
477                    utils.open_write_close(client_control_file, control)
478                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
479                                    server_control_file)
480                else:
481                    utils.open_write_close(server_control_file, control)
482                logging.info("Processing control file")
483                self._execute_code(server_control_file, namespace)
484                logging.info("Finished processing control file")
485
486                # no error occured, so we don't need to collect crashinfo
487                collect_crashinfo = False
488            except:
489                try:
490                    logging.exception(
491                            'Exception escaped control file, job aborting:')
492                except:
493                    pass # don't let logging exceptions here interfere
494                raise
495        finally:
496            if temp_control_file_dir:
497                # Clean up temp directory used for copies of the control files
498                try:
499                    shutil.rmtree(temp_control_file_dir)
500                except Exception, e:
501                    logging.warn('Could not remove temp directory %s: %s',
502                                 temp_control_file_dir, e)
503
504            if machines and (collect_crashdumps or collect_crashinfo):
505                namespace['test_start_time'] = test_start_time
506                if collect_crashinfo:
507                    # includes crashdumps
508                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
509                else:
510                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
511            if self.uncollected_log_file:
512                os.remove(self.uncollected_log_file)
513            self.disable_external_logging()
514            if cleanup and machines:
515                self._execute_code(CLEANUP_CONTROL_FILE, namespace)
516            if install_after and machines:
517                self._execute_code(INSTALL_CONTROL_FILE, namespace)
518
519
520    def set_test_tag_prefix(self, tag=''):
521        """
522        Set tag to be prepended (separated by a '.') to test name of all
523        following run_test steps.
524        """
525        self._test_tag_prefix = tag
526
527
528    def run_test(self, url, *args, **dargs):
529        """
530        Summon a test object and run it.
531
532        tag
533                tag to add to testname
534        url
535                url of the test to run
536        """
537
538        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
539
540        tag = dargs.pop('tag', None)
541        if tag is None:
542            tag = self._test_tag_prefix
543        elif self._test_tag_prefix:
544            tag = '%s.%s' % (self._test_tag_prefix, tag)
545
546        if tag:
547            testname += '.' + str(tag)
548        subdir = testname
549
550        outputdir = os.path.join(self.resultdir, subdir)
551        if os.path.exists(outputdir):
552            msg = ("%s already exists, test <%s> may have"
553                   " already run with tag <%s>" % (outputdir, testname, tag))
554            raise error.TestError(msg)
555        os.mkdir(outputdir)
556
557        def group_func():
558            try:
559                test.runtest(self, url, tag, args, dargs)
560            except error.TestBaseException, e:
561                self.record(e.exit_status, subdir, testname, str(e))
562                raise
563            except Exception, e:
564                info = str(e) + "\n" + traceback.format_exc()
565                self.record('FAIL', subdir, testname, info)
566                raise
567            else:
568                self.record('GOOD', subdir, testname, 'completed successfully')
569
570        result, exc_info = self._run_group(testname, subdir, group_func)
571        if exc_info and isinstance(exc_info[1], error.TestBaseException):
572            return False
573        elif exc_info:
574            raise exc_info[0], exc_info[1], exc_info[2]
575        else:
576            return True
577
578
579    def _run_group(self, name, subdir, function, *args, **dargs):
580        """\
581        Underlying method for running something inside of a group.
582        """
583        result, exc_info = None, None
584        old_record_prefix = self.record_prefix
585        try:
586            self.record('START', subdir, name)
587            self.record_prefix += '\t'
588            try:
589                result = function(*args, **dargs)
590            finally:
591                self.record_prefix = old_record_prefix
592        except error.TestBaseException, e:
593            self.record("END %s" % e.exit_status, subdir, name)
594            exc_info = sys.exc_info()
595        except Exception, e:
596            err_msg = str(e) + '\n'
597            err_msg += traceback.format_exc()
598            self.record('END ABORT', subdir, name, err_msg)
599            raise error.JobError(name + ' failed\n' + traceback.format_exc())
600        else:
601            self.record('END GOOD', subdir, name)
602
603        return result, exc_info
604
605
606    def run_group(self, function, *args, **dargs):
607        """\
608        function:
609                subroutine to run
610        *args:
611                arguments for the function
612        """
613
614        name = function.__name__
615
616        # Allow the tag for the group to be specified.
617        tag = dargs.pop('tag', None)
618        if tag:
619            name = tag
620
621        return self._run_group(name, None, function, *args, **dargs)[0]
622
623
624    def run_reboot(self, reboot_func, get_kernel_func):
625        """\
626        A specialization of run_group meant specifically for handling
627        a reboot. Includes support for capturing the kernel version
628        after the reboot.
629
630        reboot_func: a function that carries out the reboot
631
632        get_kernel_func: a function that returns a string
633        representing the kernel version.
634        """
635
636        old_record_prefix = self.record_prefix
637        try:
638            self.record('START', None, 'reboot')
639            self.record_prefix += '\t'
640            reboot_func()
641        except Exception, e:
642            self.record_prefix = old_record_prefix
643            err_msg = str(e) + '\n' + traceback.format_exc()
644            self.record('END FAIL', None, 'reboot', err_msg)
645            raise
646        else:
647            kernel = get_kernel_func()
648            self.record_prefix = old_record_prefix
649            self.record('END GOOD', None, 'reboot',
650                        optional_fields={"kernel": kernel})
651
652
653    def run_control(self, path):
654        """Execute a control file found at path (relative to the autotest
655        path). Intended for executing a control file within a control file,
656        not for running the top-level job control file."""
657        path = os.path.join(self.autodir, path)
658        control_file = self._load_control_file(path)
659        self.run(control=control_file, control_file_dir=self.USE_TEMP_DIR)
660
661
662    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
663        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
664                                   on_every_test)
665
666
667    def add_sysinfo_logfile(self, file, on_every_test=False):
668        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
669
670
671    def _add_sysinfo_loggable(self, loggable, on_every_test):
672        if on_every_test:
673            self.sysinfo.test_loggables.add(loggable)
674        else:
675            self.sysinfo.boot_loggables.add(loggable)
676
677
678    def record(self, status_code, subdir, operation, status='',
679               optional_fields=None):
680        """
681        Record job-level status
682
683        The intent is to make this file both machine parseable and
684        human readable. That involves a little more complexity, but
685        really isn't all that bad ;-)
686
687        Format is <status code>\t<subdir>\t<operation>\t<status>
688
689        status code: see common_lib.log.is_valid_status()
690                     for valid status definition
691
692        subdir: MUST be a relevant subdirectory in the results,
693        or None, which will be represented as '----'
694
695        operation: description of what you ran (e.g. "dbench", or
696                                        "mkfs -t foobar /dev/sda9")
697
698        status: error message or "completed sucessfully"
699
700        ------------------------------------------------------------
701
702        Initial tabs indicate indent levels for grouping, and is
703        governed by self.record_prefix
704
705        multiline messages have secondary lines prefaced by a double
706        space ('  ')
707
708        Executing this method will trigger the logging of all new
709        warnings to date from the various console loggers.
710        """
711        # poll all our warning loggers for new warnings
712        warnings = self._read_warnings()
713        old_record_prefix = self.record_prefix
714        try:
715            if status_code.startswith("END "):
716                self.record_prefix += "\t"
717            for timestamp, msg in warnings:
718                self._record("WARN", None, None, msg, timestamp)
719        finally:
720            self.record_prefix = old_record_prefix
721
722        # write out the actual status log line
723        self._record(status_code, subdir, operation, status,
724                      optional_fields=optional_fields)
725
726
727    def _read_warnings(self):
728        """Poll all the warning loggers and extract any new warnings that have
729        been logged. If the warnings belong to a category that is currently
730        disabled, this method will discard them and they will no longer be
731        retrievable.
732
733        Returns a list of (timestamp, message) tuples, where timestamp is an
734        integer epoch timestamp."""
735        warnings = []
736        while True:
737            # pull in a line of output from every logger that has
738            # output ready to be read
739            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
740            closed_loggers = set()
741            for logger in loggers:
742                line = logger.readline()
743                # record any broken pipes (aka line == empty)
744                if len(line) == 0:
745                    closed_loggers.add(logger)
746                    continue
747                # parse out the warning
748                timestamp, msgtype, msg = line.split('\t', 2)
749                timestamp = int(timestamp)
750                # if the warning is valid, add it to the results
751                if self.warning_manager.is_valid(timestamp, msgtype):
752                    warnings.append((timestamp, msg.strip()))
753
754            # stop listening to loggers that are closed
755            self.warning_loggers -= closed_loggers
756
757            # stop if none of the loggers have any output left
758            if not loggers:
759                break
760
761        # sort into timestamp order
762        warnings.sort()
763        return warnings
764
765
766    def disable_warnings(self, warning_type):
767        self.warning_manager.disable_warnings(warning_type)
768        self.record("INFO", None, None,
769                    "disabling %s warnings" % warning_type,
770                    {"warnings.disable": warning_type})
771
772
773    def enable_warnings(self, warning_type):
774        self.warning_manager.enable_warnings(warning_type)
775        self.record("INFO", None, None,
776                    "enabling %s warnings" % warning_type,
777                    {"warnings.enable": warning_type})
778
779
780    def get_status_log_path(self, subdir=None):
781        """Return the path to the job status log.
782
783        @param subdir - Optional paramter indicating that you want the path
784            to a subdirectory status log.
785
786        @returns The path where the status log should be.
787        """
788        if self.resultdir:
789            if subdir:
790                return os.path.join(self.resultdir, subdir, "status.log")
791            else:
792                return os.path.join(self.resultdir, "status.log")
793        else:
794            return None
795
796
797    def _update_uncollected_logs_list(self, update_func):
798        """Updates the uncollected logs list in a multi-process safe manner.
799
800        @param update_func - a function that updates the list of uncollected
801            logs. Should take one parameter, the list to be updated.
802        """
803        if self.uncollected_log_file:
804            log_file = open(self.uncollected_log_file, "r+")
805            fcntl.flock(log_file, fcntl.LOCK_EX)
806        try:
807            uncollected_logs = pickle.load(log_file)
808            update_func(uncollected_logs)
809            log_file.seek(0)
810            log_file.truncate()
811            pickle.dump(uncollected_logs, log_file)
812            log_file.flush()
813        finally:
814            fcntl.flock(log_file, fcntl.LOCK_UN)
815            log_file.close()
816
817
818    def add_client_log(self, hostname, remote_path, local_path):
819        """Adds a new set of client logs to the list of uncollected logs,
820        to allow for future log recovery.
821
822        @param host - the hostname of the machine holding the logs
823        @param remote_path - the directory on the remote machine holding logs
824        @param local_path - the local directory to copy the logs into
825        """
826        def update_func(logs_list):
827            logs_list.append((hostname, remote_path, local_path))
828        self._update_uncollected_logs_list(update_func)
829
830
831    def remove_client_log(self, hostname, remote_path, local_path):
832        """Removes a set of client logs from the list of uncollected logs,
833        to allow for future log recovery.
834
835        @param host - the hostname of the machine holding the logs
836        @param remote_path - the directory on the remote machine holding logs
837        @param local_path - the local directory to copy the logs into
838        """
839        def update_func(logs_list):
840            logs_list.remove((hostname, remote_path, local_path))
841        self._update_uncollected_logs_list(update_func)
842
843
844    def _render_record(self, status_code, subdir, operation, status='',
845                       epoch_time=None, record_prefix=None,
846                       optional_fields=None):
847        """
848        Internal Function to generate a record to be written into a
849        status log. For use by server_job.* classes only.
850        """
851        if subdir:
852            if re.match(r'[\n\t]', subdir):
853                raise ValueError('Invalid character in subdir string')
854            substr = subdir
855        else:
856            substr = '----'
857
858        if not log.is_valid_status(status_code):
859            raise ValueError('Invalid status code supplied: %s' % status_code)
860        if not operation:
861            operation = '----'
862        if re.match(r'[\n\t]', operation):
863            raise ValueError('Invalid character in operation string')
864        operation = operation.rstrip()
865        status = status.rstrip()
866        status = re.sub(r"\t", "  ", status)
867        # Ensure any continuation lines are marked so we can
868        # detect them in the status file to ensure it is parsable.
869        status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
870
871        if not optional_fields:
872            optional_fields = {}
873
874        # Generate timestamps for inclusion in the logs
875        if epoch_time is None:
876            epoch_time = int(time.time())
877        local_time = time.localtime(epoch_time)
878        optional_fields["timestamp"] = str(epoch_time)
879        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
880                                                     local_time)
881
882        fields = [status_code, substr, operation]
883        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
884        fields.append(status)
885
886        if record_prefix is None:
887            record_prefix = self.record_prefix
888
889        msg = '\t'.join(str(x) for x in fields)
890        return record_prefix + msg + '\n'
891
892
893    def _record_prerendered(self, msg):
894        """
895        Record a pre-rendered msg into the status logs. The only
896        change this makes to the message is to add on the local
897        indentation. Should not be called outside of server_job.*
898        classes. Unlike _record, this does not write the message
899        to standard output.
900        """
901        lines = []
902        status_file = self.get_status_log_path()
903        status_log = open(status_file, 'a')
904        for line in msg.splitlines():
905            line = self.record_prefix + line + '\n'
906            lines.append(line)
907            status_log.write(line)
908        status_log.close()
909        self.__parse_status(lines)
910
911
912    def _fill_server_control_namespace(self, namespace, protect=True):
913        """
914        Prepare a namespace to be used when executing server control files.
915
916        This sets up the control file API by importing modules and making them
917        available under the appropriate names within namespace.
918
919        For use by _execute_code().
920
921        Args:
922          namespace: The namespace dictionary to fill in.
923          protect: Boolean.  If True (the default) any operation that would
924              clobber an existing entry in namespace will cause an error.
925        Raises:
926          error.AutoservError: When a name would be clobbered by import.
927        """
928        def _import_names(module_name, names=()):
929            """
930            Import a module and assign named attributes into namespace.
931
932            Args:
933                module_name: The string module name.
934                names: A limiting list of names to import from module_name.  If
935                    empty (the default), all names are imported from the module
936                    similar to a "from foo.bar import *" statement.
937            Raises:
938                error.AutoservError: When a name being imported would clobber
939                    a name already in namespace.
940            """
941            module = __import__(module_name, {}, {}, names)
942
943            # No names supplied?  Import * from the lowest level module.
944            # (Ugh, why do I have to implement this part myself?)
945            if not names:
946                for submodule_name in module_name.split('.')[1:]:
947                    module = getattr(module, submodule_name)
948                if hasattr(module, '__all__'):
949                    names = getattr(module, '__all__')
950                else:
951                    names = dir(module)
952
953            # Install each name into namespace, checking to make sure it
954            # doesn't override anything that already exists.
955            for name in names:
956                # Check for conflicts to help prevent future problems.
957                if name in namespace and protect:
958                    if namespace[name] is not getattr(module, name):
959                        raise error.AutoservError('importing name '
960                                '%s from %s %r would override %r' %
961                                (name, module_name, getattr(module, name),
962                                 namespace[name]))
963                    else:
964                        # Encourage cleanliness and the use of __all__ for a
965                        # more concrete API with less surprises on '*' imports.
966                        warnings.warn('%s (%r) being imported from %s for use '
967                                      'in server control files is not the '
968                                      'first occurrance of that import.' %
969                                      (name, namespace[name], module_name))
970
971                namespace[name] = getattr(module, name)
972
973
974        # This is the equivalent of prepending a bunch of import statements to
975        # the front of the control script.
976        namespace.update(os=os, sys=sys, logging=logging)
977        _import_names('autotest_lib.server',
978                ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
979                 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
980        _import_names('autotest_lib.server.subcommand',
981                      ('parallel', 'parallel_simple', 'subcommand'))
982        _import_names('autotest_lib.server.utils',
983                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
984        _import_names('autotest_lib.client.common_lib.error')
985        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
986
987        # Inject ourself as the job object into other classes within the API.
988        # (Yuck, this injection is a gross thing be part of a public API. -gps)
989        #
990        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
991        namespace['autotest'].Autotest.job = self
992        # server.hosts.base_classes.Host uses .job.
993        namespace['hosts'].Host.job = self
994
995
996    def _execute_code(self, code_file, namespace, protect=True):
997        """
998        Execute code using a copy of namespace as a server control script.
999
1000        Unless protect_namespace is explicitly set to False, the dict will not
1001        be modified.
1002
1003        Args:
1004          code_file: The filename of the control file to execute.
1005          namespace: A dict containing names to make available during execution.
1006          protect: Boolean.  If True (the default) a copy of the namespace dict
1007              is used during execution to prevent the code from modifying its
1008              contents outside of this function.  If False the raw dict is
1009              passed in and modifications will be allowed.
1010        """
1011        if protect:
1012            namespace = namespace.copy()
1013        self._fill_server_control_namespace(namespace, protect=protect)
1014        # TODO: Simplify and get rid of the special cases for only 1 machine.
1015        if len(self.machines) > 1:
1016            machines_text = '\n'.join(self.machines) + '\n'
1017            # Only rewrite the file if it does not match our machine list.
1018            try:
1019                machines_f = open(MACHINES_FILENAME, 'r')
1020                existing_machines_text = machines_f.read()
1021                machines_f.close()
1022            except EnvironmentError:
1023                existing_machines_text = None
1024            if machines_text != existing_machines_text:
1025                utils.open_write_close(MACHINES_FILENAME, machines_text)
1026        execfile(code_file, namespace, namespace)
1027
1028
1029    def _record(self, status_code, subdir, operation, status='',
1030                 epoch_time=None, optional_fields=None):
1031        """
1032        Actual function for recording a single line into the status
1033        logs. Should never be called directly, only by job.record as
1034        this would bypass the console monitor logging.
1035        """
1036
1037        msg = self._render_record(status_code, subdir, operation, status,
1038                                  epoch_time, optional_fields=optional_fields)
1039
1040        status_file = self.get_status_log_path()
1041        sys.stdout.write(msg)
1042        if status_file:
1043            open(status_file, "a").write(msg)
1044        if subdir:
1045            sub_status_file = self.get_status_log_path(subdir)
1046            open(sub_status_file, "a").write(msg)
1047        self.__parse_status(msg.splitlines())
1048
1049
1050    def __parse_status(self, new_lines):
1051        if not self.using_parser:
1052            return
1053        new_tests = self.parser.process_lines(new_lines)
1054        for test in new_tests:
1055            self.__insert_test(test)
1056
1057
1058    def __insert_test(self, test):
1059        """
1060        An internal method to insert a new test result into the
1061        database. This method will not raise an exception, even if an
1062        error occurs during the insert, to avoid failing a test
1063        simply because of unexpected database issues."""
1064        self.num_tests_run += 1
1065        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1066            self.num_tests_failed += 1
1067        try:
1068            self.results_db.insert_test(self.job_model, test)
1069        except Exception:
1070            msg = ("WARNING: An unexpected error occured while "
1071                   "inserting test results into the database. "
1072                   "Ignoring error.\n" + traceback.format_exc())
1073            print >> sys.stderr, msg
1074
1075
1076site_server_job = utils.import_site_class(
1077    __file__, "autotest_lib.server.site_server_job", "site_server_job",
1078    base_server_job)
1079
1080class server_job(site_server_job):
1081    pass
1082
1083
1084class warning_manager(object):
1085    """Class for controlling warning logs. Manages the enabling and disabling
1086    of warnings."""
1087    def __init__(self):
1088        # a map of warning types to a list of disabled time intervals
1089        self.disabled_warnings = {}
1090
1091
1092    def is_valid(self, timestamp, warning_type):
1093        """Indicates if a warning (based on the time it occured and its type)
1094        is a valid warning. A warning is considered "invalid" if this type of
1095        warning was marked as "disabled" at the time the warning occured."""
1096        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1097        for start, end in disabled_intervals:
1098            if timestamp >= start and (end is None or timestamp < end):
1099                return False
1100        return True
1101
1102
1103    def disable_warnings(self, warning_type, current_time_func=time.time):
1104        """As of now, disables all further warnings of this type."""
1105        intervals = self.disabled_warnings.setdefault(warning_type, [])
1106        if not intervals or intervals[-1][1] is not None:
1107            intervals.append((int(current_time_func()), None))
1108
1109
1110    def enable_warnings(self, warning_type, current_time_func=time.time):
1111        """As of now, enables all further warnings of this type."""
1112        intervals = self.disabled_warnings.get(warning_type, [])
1113        if intervals and intervals[-1][1] is None:
1114            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1115