server_job.py revision e29d0e442c8eefe07dfb2533e1a1dafc14b69e0b
1"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9import getpass, os, sys, re, stat, tempfile, time, select, subprocess
10import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
11from autotest_lib.client.bin import sysinfo
12from autotest_lib.client.common_lib import base_job
13from autotest_lib.client.common_lib import error, log, utils, packages
14from autotest_lib.client.common_lib import logging_manager
15from autotest_lib.server import test, subcommand, profilers
16from autotest_lib.server.hosts import abstract_ssh
17from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
18
19
20def _control_segment_path(name):
21    """Get the pathname of the named control segment file."""
22    server_dir = os.path.dirname(os.path.abspath(__file__))
23    return os.path.join(server_dir, "control_segments", name)
24
25
26CLIENT_CONTROL_FILENAME = 'control'
27SERVER_CONTROL_FILENAME = 'control.srv'
28MACHINES_FILENAME = '.machines'
29
30CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
31CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
32CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
33INSTALL_CONTROL_FILE = _control_segment_path('install')
34CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
35
36VERIFY_CONTROL_FILE = _control_segment_path('verify')
37REPAIR_CONTROL_FILE = _control_segment_path('repair')
38
39
40# by default provide a stub that generates no site data
41def _get_site_job_data_dummy(job):
42    return {}
43
44
45# load up site-specific code for generating site-specific job data
46get_site_job_data = utils.import_site_function(__file__,
47    "autotest_lib.server.site_server_job", "get_site_job_data",
48    _get_site_job_data_dummy)
49
50
51class status_indenter(base_job.status_indenter):
52    """Provide a simple integer-backed status indenter."""
53    def __init__(self):
54        self._indent = 0
55
56
57    @property
58    def indent(self):
59        return self._indent
60
61
62    def increment(self):
63        self._indent += 1
64
65
66    def decrement(self):
67        self._indent -= 1
68
69
70    def get_context(self):
71        """Returns a context object for use by job.get_record_context."""
72        class context(object):
73            def __init__(self, indenter, indent):
74                self._indenter = indenter
75                self._indent = indent
76            def restore(self):
77                self._indenter._indent = self._indent
78        return context(self, self._indent)
79
80
81class server_job_record_hook(object):
82    """The job.record hook for server job. Used to inject WARN messages from
83    the console or vlm whenever new logs are written, and to echo any logs
84    to INFO level logging. Implemented as a class so that it can use state to
85    block recursive calls, so that the hook can call job.record itself to
86    log WARN messages.
87
88    Depends on job._read_warnings and job._logger.
89    """
90    def __init__(self, job):
91        self._job = job
92        self._being_called = False
93
94
95    def __call__(self, entry):
96        """A wrapper around the 'real' record hook, the _hook method, which
97        prevents recursion. This isn't making any effort to be threadsafe,
98        the intent is to outright block infinite recursion via a
99        job.record->_hook->job.record->_hook->job.record... chain."""
100        if self._being_called:
101            return
102        self._being_called = True
103        try:
104            self._hook(self._job, entry)
105        finally:
106            self._being_called = False
107
108
109    @staticmethod
110    def _hook(job, entry):
111        """The core hook, which can safely call job.record."""
112        entries = []
113        # poll all our warning loggers for new warnings
114        for timestamp, msg in job._read_warnings():
115            warning_entry = base_job.status_log_entry(
116                'WARN', None, None, msg, {}, timestamp=timestamp)
117            entries.append(warning_entry)
118            job.record_entry(warning_entry)
119        # echo rendered versions of all the status logs to info
120        entries.append(entry)
121        for entry in entries:
122            rendered_entry = job._logger.render_entry(entry)
123            logging.info(rendered_entry)
124            job._parse_status(rendered_entry)
125
126
127class base_server_job(base_job.base_job):
128    """The server-side concrete implementation of base_job.
129
130    Optional properties provided by this implementation:
131        serverdir
132        conmuxdir
133
134        num_tests_run
135        num_tests_failed
136
137        warning_manager
138        warning_loggers
139    """
140
141    _STATUS_VERSION = 1
142
143    def __init__(self, control, args, resultdir, label, user, machines,
144                 client=False, parse_job='',
145                 ssh_user='root', ssh_port=22, ssh_pass='',
146                 group_name='', tag='',
147                 control_filename=SERVER_CONTROL_FILENAME):
148        """
149        Create a server side job object.
150
151        @param control: The pathname of the control file.
152        @param args: Passed to the control file.
153        @param resultdir: Where to throw the results.
154        @param label: Description of the job.
155        @param user: Username for the job (email address).
156        @param client: True if this is a client-side control file.
157        @param parse_job: string, if supplied it is the job execution tag that
158                the results will be passed through to the TKO parser with.
159        @param ssh_user: The SSH username.  [root]
160        @param ssh_port: The SSH port number.  [22]
161        @param ssh_pass: The SSH passphrase, if needed.
162        @param group_name: If supplied, this will be written out as
163                host_group_name in the keyvals file for the parser.
164        @param tag: The job execution tag from the scheduler.  [optional]
165        @param control_filename: The filename where the server control file
166                should be written in the results directory.
167        """
168        super(base_server_job, self).__init__(resultdir=resultdir)
169
170        path = os.path.dirname(__file__)
171        self.control = control
172        self._uncollected_log_file = os.path.join(self.resultdir,
173                                                  'uncollected_logs')
174        debugdir = os.path.join(self.resultdir, 'debug')
175        if not os.path.exists(debugdir):
176            os.mkdir(debugdir)
177
178        if user:
179            self.user = user
180        else:
181            self.user = getpass.getuser()
182
183        self.args = args
184        self.machines = machines
185        self._client = client
186        self.warning_loggers = set()
187        self.warning_manager = warning_manager()
188        self._ssh_user = ssh_user
189        self._ssh_port = ssh_port
190        self._ssh_pass = ssh_pass
191        self.tag = tag
192        self.last_boot_tag = None
193        self.hosts = set()
194        self.drop_caches = False
195        self.drop_caches_between_iterations = False
196        self._control_filename = control_filename
197
198        self.logging = logging_manager.get_logging_manager(
199                manage_stdout_and_stderr=True, redirect_fds=True)
200        subcommand.logging_manager_object = self.logging
201
202        self.sysinfo = sysinfo.sysinfo(self.resultdir)
203        self.profilers = profilers.profilers(self)
204
205        job_data = {'label' : label, 'user' : user,
206                    'hostname' : ','.join(machines),
207                    'status_version' : str(self._STATUS_VERSION),
208                    'job_started' : str(int(time.time()))}
209        if group_name:
210            job_data['host_group_name'] = group_name
211
212        # only write these keyvals out on the first job in a resultdir
213        if 'job_started' not in utils.read_keyval(self.resultdir):
214            job_data.update(get_site_job_data(self))
215            utils.write_keyval(self.resultdir, job_data)
216
217        self._parse_job = parse_job
218        self._using_parser = (self._parse_job and len(machines) <= 1)
219        self.pkgmgr = packages.PackageManager(
220            self.autodir, run_function_dargs={'timeout':600})
221        self.num_tests_run = 0
222        self.num_tests_failed = 0
223
224        self._register_subcommand_hooks()
225
226        # these components aren't usable on the server
227        self.bootloader = None
228        self.harness = None
229
230        # set up the status logger
231        self._indenter = status_indenter()
232        self._logger = base_job.status_logger(
233            self, self._indenter, 'status.log', 'status.log',
234            record_hook=server_job_record_hook(self))
235
236
237    @classmethod
238    def _find_base_directories(cls):
239        """
240        Determine locations of autodir, clientdir and serverdir. Assumes
241        that this file is located within serverdir and uses __file__ along
242        with relative paths to resolve the location.
243        """
244        serverdir = os.path.abspath(os.path.dirname(__file__))
245        autodir = os.path.normpath(os.path.join(serverdir, '..'))
246        clientdir = os.path.join(autodir, 'client')
247        return autodir, clientdir, serverdir
248
249
250    def _find_resultdir(self, resultdir):
251        """
252        Determine the location of resultdir. For server jobs we expect one to
253        always be explicitly passed in to __init__, so just return that.
254        """
255        if resultdir:
256            return os.path.normpath(resultdir)
257        else:
258            return None
259
260
261    def _get_status_logger(self):
262        """Return a reference to the status logger."""
263        return self._logger
264
265
266    @staticmethod
267    def _load_control_file(path):
268        f = open(path)
269        try:
270            control_file = f.read()
271        finally:
272            f.close()
273        return re.sub('\r', '', control_file)
274
275
276    def _register_subcommand_hooks(self):
277        """
278        Register some hooks into the subcommand modules that allow us
279        to properly clean up self.hosts created in forked subprocesses.
280        """
281        def on_fork(cmd):
282            self._existing_hosts_on_fork = set(self.hosts)
283        def on_join(cmd):
284            new_hosts = self.hosts - self._existing_hosts_on_fork
285            for host in new_hosts:
286                host.close()
287        subcommand.subcommand.register_fork_hook(on_fork)
288        subcommand.subcommand.register_join_hook(on_join)
289
290
291    def init_parser(self):
292        """
293        Start the continuous parsing of self.resultdir. This sets up
294        the database connection and inserts the basic job object into
295        the database if necessary.
296        """
297        if not self._using_parser:
298            return
299        # redirect parser debugging to .parse.log
300        parse_log = os.path.join(self.resultdir, '.parse.log')
301        parse_log = open(parse_log, 'w', 0)
302        tko_utils.redirect_parser_debugging(parse_log)
303        # create a job model object and set up the db
304        self.results_db = tko_db.db(autocommit=True)
305        self.parser = status_lib.parser(self._STATUS_VERSION)
306        self.job_model = self.parser.make_job(self.resultdir)
307        self.parser.start(self.job_model)
308        # check if a job already exists in the db and insert it if
309        # it does not
310        job_idx = self.results_db.find_job(self._parse_job)
311        if job_idx is None:
312            self.results_db.insert_job(self._parse_job, self.job_model)
313        else:
314            machine_idx = self.results_db.lookup_machine(self.job_model.machine)
315            self.job_model.index = job_idx
316            self.job_model.machine_idx = machine_idx
317
318
319    def cleanup_parser(self):
320        """
321        This should be called after the server job is finished
322        to carry out any remaining cleanup (e.g. flushing any
323        remaining test results to the results db)
324        """
325        if not self._using_parser:
326            return
327        final_tests = self.parser.end()
328        for test in final_tests:
329            self.__insert_test(test)
330        self._using_parser = False
331
332
333    def verify(self):
334        if not self.machines:
335            raise error.AutoservError('No machines specified to verify')
336        if self.resultdir:
337            os.chdir(self.resultdir)
338        try:
339            namespace = {'machines' : self.machines, 'job' : self,
340                         'ssh_user' : self._ssh_user,
341                         'ssh_port' : self._ssh_port,
342                         'ssh_pass' : self._ssh_pass}
343            self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
344        except Exception, e:
345            msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())
346            self.record('ABORT', None, None, msg)
347            raise
348
349
350    def repair(self, host_protection):
351        if not self.machines:
352            raise error.AutoservError('No machines specified to repair')
353        if self.resultdir:
354            os.chdir(self.resultdir)
355        namespace = {'machines': self.machines, 'job': self,
356                     'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,
357                     'ssh_pass': self._ssh_pass,
358                     'protection_level': host_protection}
359
360        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
361
362
363    def precheck(self):
364        """
365        perform any additional checks in derived classes.
366        """
367        pass
368
369
370    def enable_external_logging(self):
371        """
372        Start or restart external logging mechanism.
373        """
374        pass
375
376
377    def disable_external_logging(self):
378        """
379        Pause or stop external logging mechanism.
380        """
381        pass
382
383
384    def use_external_logging(self):
385        """
386        Return True if external logging should be used.
387        """
388        return False
389
390
391    def _make_parallel_wrapper(self, function, machines, log):
392        """Wrap function as appropriate for calling by parallel_simple."""
393        is_forking = not (len(machines) == 1 and self.machines == machines)
394        if self._parse_job and is_forking and log:
395            def wrapper(machine):
396                self._parse_job += "/" + machine
397                self._using_parser = True
398                self.machines = [machine]
399                self.push_execution_context(machine)
400                os.chdir(self.resultdir)
401                utils.write_keyval(self.resultdir, {"hostname": machine})
402                self.init_parser()
403                result = function(machine)
404                self.cleanup_parser()
405                return result
406        elif len(machines) > 1 and log:
407            def wrapper(machine):
408                self.push_execution_context(machine)
409                os.chdir(self.resultdir)
410                machine_data = {'hostname' : machine,
411                                'status_version' : str(self._STATUS_VERSION)}
412                utils.write_keyval(self.resultdir, machine_data)
413                result = function(machine)
414                return result
415        else:
416            wrapper = function
417        return wrapper
418
419
420    def parallel_simple(self, function, machines, log=True, timeout=None,
421                        return_results=False):
422        """
423        Run 'function' using parallel_simple, with an extra wrapper to handle
424        the necessary setup for continuous parsing, if possible. If continuous
425        parsing is already properly initialized then this should just work.
426
427        @param function: A callable to run in parallel given each machine.
428        @param machines: A list of machine names to be passed one per subcommand
429                invocation of function.
430        @param log: If True, output will be written to output in a subdirectory
431                named after each machine.
432        @param timeout: Seconds after which the function call should timeout.
433        @param return_results: If True instead of an AutoServError being raised
434                on any error a list of the results|exceptions from the function
435                called on each arg is returned.  [default: False]
436
437        @raises error.AutotestError: If any of the functions failed.
438        """
439        wrapper = self._make_parallel_wrapper(function, machines, log)
440        return subcommand.parallel_simple(wrapper, machines,
441                                          log=log, timeout=timeout,
442                                          return_results=return_results)
443
444
445    def parallel_on_machines(self, function, machines, timeout=None):
446        """
447        @param function: Called in parallel with one machine as its argument.
448        @param machines: A list of machines to call function(machine) on.
449        @param timeout: Seconds after which the function call should timeout.
450
451        @returns A list of machines on which function(machine) returned
452                without raising an exception.
453        """
454        results = self.parallel_simple(function, machines, timeout=timeout,
455                                       return_results=True)
456        success_machines = []
457        for result, machine in itertools.izip(results, machines):
458            if not isinstance(result, Exception):
459                success_machines.append(machine)
460        return success_machines
461
462
463    _USE_TEMP_DIR = object()
464    def run(self, cleanup=False, install_before=False, install_after=False,
465            collect_crashdumps=True, namespace={}, control=None,
466            control_file_dir=None, only_collect_crashinfo=False):
467        # for a normal job, make sure the uncollected logs file exists
468        # for a crashinfo-only run it should already exist, bail out otherwise
469        created_uncollected_logs = False
470        if self.resultdir and not os.path.exists(self._uncollected_log_file):
471            if only_collect_crashinfo:
472                # if this is a crashinfo-only run, and there were no existing
473                # uncollected logs, just bail out early
474                logging.info("No existing uncollected logs, "
475                             "skipping crashinfo collection")
476                return
477            else:
478                log_file = open(self._uncollected_log_file, "w")
479                pickle.dump([], log_file)
480                log_file.close()
481                created_uncollected_logs = True
482
483        # use a copy so changes don't affect the original dictionary
484        namespace = namespace.copy()
485        machines = self.machines
486        if control is None:
487            if self.control is None:
488                control = ''
489            else:
490                control = self._load_control_file(self.control)
491        if control_file_dir is None:
492            control_file_dir = self.resultdir
493
494        self.aborted = False
495        namespace['machines'] = machines
496        namespace['args'] = self.args
497        namespace['job'] = self
498        namespace['ssh_user'] = self._ssh_user
499        namespace['ssh_port'] = self._ssh_port
500        namespace['ssh_pass'] = self._ssh_pass
501        test_start_time = int(time.time())
502
503        if self.resultdir:
504            os.chdir(self.resultdir)
505            # touch status.log so that the parser knows a job is running here
506            open(self.get_status_log_path(), 'a').close()
507            self.enable_external_logging()
508
509        collect_crashinfo = True
510        temp_control_file_dir = None
511        try:
512            try:
513                if install_before and machines:
514                    self._execute_code(INSTALL_CONTROL_FILE, namespace)
515
516                if only_collect_crashinfo:
517                    return
518
519                # determine the dir to write the control files to
520                cfd_specified = (control_file_dir
521                                 and control_file_dir is not self._USE_TEMP_DIR)
522                if cfd_specified:
523                    temp_control_file_dir = None
524                else:
525                    temp_control_file_dir = tempfile.mkdtemp(
526                        suffix='temp_control_file_dir')
527                    control_file_dir = temp_control_file_dir
528                server_control_file = os.path.join(control_file_dir,
529                                                   self._control_filename)
530                client_control_file = os.path.join(control_file_dir,
531                                                   CLIENT_CONTROL_FILENAME)
532                if self._client:
533                    namespace['control'] = control
534                    utils.open_write_close(client_control_file, control)
535                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
536                                    server_control_file)
537                else:
538                    utils.open_write_close(server_control_file, control)
539                logging.info("Processing control file")
540                self._execute_code(server_control_file, namespace)
541                logging.info("Finished processing control file")
542
543                # no error occured, so we don't need to collect crashinfo
544                collect_crashinfo = False
545            except:
546                try:
547                    logging.exception(
548                            'Exception escaped control file, job aborting:')
549                except:
550                    pass # don't let logging exceptions here interfere
551                raise
552        finally:
553            if temp_control_file_dir:
554                # Clean up temp directory used for copies of the control files
555                try:
556                    shutil.rmtree(temp_control_file_dir)
557                except Exception, e:
558                    logging.warn('Could not remove temp directory %s: %s',
559                                 temp_control_file_dir, e)
560
561            if machines and (collect_crashdumps or collect_crashinfo):
562                namespace['test_start_time'] = test_start_time
563                if collect_crashinfo:
564                    # includes crashdumps
565                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)
566                else:
567                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)
568            if self._uncollected_log_file and created_uncollected_logs:
569                os.remove(self._uncollected_log_file)
570            self.disable_external_logging()
571            if cleanup and machines:
572                self._execute_code(CLEANUP_CONTROL_FILE, namespace)
573            if install_after and machines:
574                self._execute_code(INSTALL_CONTROL_FILE, namespace)
575
576
577    def run_test(self, url, *args, **dargs):
578        """
579        Summon a test object and run it.
580
581        tag
582                tag to add to testname
583        url
584                url of the test to run
585        """
586        group, testname = self.pkgmgr.get_package_name(url, 'test')
587        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
588        outputdir = self._make_test_outputdir(subdir)
589
590        def group_func():
591            try:
592                test.runtest(self, url, tag, args, dargs)
593            except error.TestBaseException, e:
594                self.record(e.exit_status, subdir, testname, str(e))
595                raise
596            except Exception, e:
597                info = str(e) + "\n" + traceback.format_exc()
598                self.record('FAIL', subdir, testname, info)
599                raise
600            else:
601                self.record('GOOD', subdir, testname, 'completed successfully')
602
603        result, exc_info = self._run_group(testname, subdir, group_func)
604        if exc_info and isinstance(exc_info[1], error.TestBaseException):
605            return False
606        elif exc_info:
607            raise exc_info[0], exc_info[1], exc_info[2]
608        else:
609            return True
610
611
612    def _run_group(self, name, subdir, function, *args, **dargs):
613        """\
614        Underlying method for running something inside of a group.
615        """
616        result, exc_info = None, None
617        try:
618            self.record('START', subdir, name)
619            result = function(*args, **dargs)
620        except error.TestBaseException, e:
621            self.record("END %s" % e.exit_status, subdir, name)
622            exc_info = sys.exc_info()
623        except Exception, e:
624            err_msg = str(e) + '\n'
625            err_msg += traceback.format_exc()
626            self.record('END ABORT', subdir, name, err_msg)
627            raise error.JobError(name + ' failed\n' + traceback.format_exc())
628        else:
629            self.record('END GOOD', subdir, name)
630
631        return result, exc_info
632
633
634    def run_group(self, function, *args, **dargs):
635        """\
636        function:
637                subroutine to run
638        *args:
639                arguments for the function
640        """
641
642        name = function.__name__
643
644        # Allow the tag for the group to be specified.
645        tag = dargs.pop('tag', None)
646        if tag:
647            name = tag
648
649        return self._run_group(name, None, function, *args, **dargs)[0]
650
651
652    def run_reboot(self, reboot_func, get_kernel_func):
653        """\
654        A specialization of run_group meant specifically for handling
655        a reboot. Includes support for capturing the kernel version
656        after the reboot.
657
658        reboot_func: a function that carries out the reboot
659
660        get_kernel_func: a function that returns a string
661        representing the kernel version.
662        """
663        try:
664            self.record('START', None, 'reboot')
665            reboot_func()
666        except Exception, e:
667            err_msg = str(e) + '\n' + traceback.format_exc()
668            self.record('END FAIL', None, 'reboot', err_msg)
669            raise
670        else:
671            kernel = get_kernel_func()
672            self.record('END GOOD', None, 'reboot',
673                        optional_fields={"kernel": kernel})
674
675
676    def run_control(self, path):
677        """Execute a control file found at path (relative to the autotest
678        path). Intended for executing a control file within a control file,
679        not for running the top-level job control file."""
680        path = os.path.join(self.autodir, path)
681        control_file = self._load_control_file(path)
682        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
683
684
685    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
686        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
687                                   on_every_test)
688
689
690    def add_sysinfo_logfile(self, file, on_every_test=False):
691        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
692
693
694    def _add_sysinfo_loggable(self, loggable, on_every_test):
695        if on_every_test:
696            self.sysinfo.test_loggables.add(loggable)
697        else:
698            self.sysinfo.boot_loggables.add(loggable)
699
700
701    def _read_warnings(self):
702        """Poll all the warning loggers and extract any new warnings that have
703        been logged. If the warnings belong to a category that is currently
704        disabled, this method will discard them and they will no longer be
705        retrievable.
706
707        Returns a list of (timestamp, message) tuples, where timestamp is an
708        integer epoch timestamp."""
709        warnings = []
710        while True:
711            # pull in a line of output from every logger that has
712            # output ready to be read
713            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
714            closed_loggers = set()
715            for logger in loggers:
716                line = logger.readline()
717                # record any broken pipes (aka line == empty)
718                if len(line) == 0:
719                    closed_loggers.add(logger)
720                    continue
721                # parse out the warning
722                timestamp, msgtype, msg = line.split('\t', 2)
723                timestamp = int(timestamp)
724                # if the warning is valid, add it to the results
725                if self.warning_manager.is_valid(timestamp, msgtype):
726                    warnings.append((timestamp, msg.strip()))
727
728            # stop listening to loggers that are closed
729            self.warning_loggers -= closed_loggers
730
731            # stop if none of the loggers have any output left
732            if not loggers:
733                break
734
735        # sort into timestamp order
736        warnings.sort()
737        return warnings
738
739
740    def _unique_subdirectory(self, base_subdirectory_name):
741        """Compute a unique results subdirectory based on the given name.
742
743        Appends base_subdirectory_name with a number as necessary to find a
744        directory name that doesn't already exist.
745        """
746        subdirectory = base_subdirectory_name
747        counter = 1
748        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
749            subdirectory = base_subdirectory_name + '.' + str(counter)
750            counter += 1
751        return subdirectory
752
753
754    def get_record_context(self):
755        """Returns an object representing the current job.record context.
756
757        The object returned is an opaque object with a 0-arg restore method
758        which can be called to restore the job.record context (i.e. indentation)
759        to the current level. The intention is that it should be used when
760        something external which generate job.record calls (e.g. an autotest
761        client) can fail catastrophically and the server job record state
762        needs to be reset to its original "known good" state.
763
764        @return: A context object with a 0-arg restore() method."""
765        return self._indenter.get_context()
766
767
768    def record_summary(self, status_code, test_name, reason='', attributes=None,
769                       distinguishing_attributes=(), child_test_ids=None):
770        """Record a summary test result.
771
772        @param status_code: status code string, see
773                common_lib.log.is_valid_status()
774        @param test_name: name of the test
775        @param reason: (optional) string providing detailed reason for test
776                outcome
777        @param attributes: (optional) dict of string keyvals to associate with
778                this result
779        @param distinguishing_attributes: (optional) list of attribute names
780                that should be used to distinguish identically-named test
781                results.  These attributes should be present in the attributes
782                parameter.  This is used to generate user-friendly subdirectory
783                names.
784        @param child_test_ids: (optional) list of test indices for test results
785                used in generating this result.
786        """
787        subdirectory_name_parts = [test_name]
788        for attribute in distinguishing_attributes:
789            assert attributes
790            assert attribute in attributes, '%s not in %s' % (attribute,
791                                                              attributes)
792            subdirectory_name_parts.append(attributes[attribute])
793        base_subdirectory_name = '.'.join(subdirectory_name_parts)
794
795        subdirectory = self._unique_subdirectory(base_subdirectory_name)
796        subdirectory_path = os.path.join(self.resultdir, subdirectory)
797        os.mkdir(subdirectory_path)
798
799        self.record(status_code, subdirectory, test_name,
800                    status=reason, optional_fields={'is_summary': True})
801
802        if attributes:
803            utils.write_keyval(subdirectory_path, attributes)
804
805        if child_test_ids:
806            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
807            summary_data = {'child_test_ids': ids_string}
808            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
809                               summary_data)
810
811
812    def disable_warnings(self, warning_type):
813        self.warning_manager.disable_warnings(warning_type)
814        self.record("INFO", None, None,
815                    "disabling %s warnings" % warning_type,
816                    {"warnings.disable": warning_type})
817
818
819    def enable_warnings(self, warning_type):
820        self.warning_manager.enable_warnings(warning_type)
821        self.record("INFO", None, None,
822                    "enabling %s warnings" % warning_type,
823                    {"warnings.enable": warning_type})
824
825
826    def get_status_log_path(self, subdir=None):
827        """Return the path to the job status log.
828
829        @param subdir - Optional paramter indicating that you want the path
830            to a subdirectory status log.
831
832        @returns The path where the status log should be.
833        """
834        if self.resultdir:
835            if subdir:
836                return os.path.join(self.resultdir, subdir, "status.log")
837            else:
838                return os.path.join(self.resultdir, "status.log")
839        else:
840            return None
841
842
843    def _update_uncollected_logs_list(self, update_func):
844        """Updates the uncollected logs list in a multi-process safe manner.
845
846        @param update_func - a function that updates the list of uncollected
847            logs. Should take one parameter, the list to be updated.
848        """
849        if self._uncollected_log_file:
850            log_file = open(self._uncollected_log_file, "r+")
851            fcntl.flock(log_file, fcntl.LOCK_EX)
852        try:
853            uncollected_logs = pickle.load(log_file)
854            update_func(uncollected_logs)
855            log_file.seek(0)
856            log_file.truncate()
857            pickle.dump(uncollected_logs, log_file)
858            log_file.flush()
859        finally:
860            fcntl.flock(log_file, fcntl.LOCK_UN)
861            log_file.close()
862
863
864    def add_client_log(self, hostname, remote_path, local_path):
865        """Adds a new set of client logs to the list of uncollected logs,
866        to allow for future log recovery.
867
868        @param host - the hostname of the machine holding the logs
869        @param remote_path - the directory on the remote machine holding logs
870        @param local_path - the local directory to copy the logs into
871        """
872        def update_func(logs_list):
873            logs_list.append((hostname, remote_path, local_path))
874        self._update_uncollected_logs_list(update_func)
875
876
877    def remove_client_log(self, hostname, remote_path, local_path):
878        """Removes a set of client logs from the list of uncollected logs,
879        to allow for future log recovery.
880
881        @param host - the hostname of the machine holding the logs
882        @param remote_path - the directory on the remote machine holding logs
883        @param local_path - the local directory to copy the logs into
884        """
885        def update_func(logs_list):
886            logs_list.remove((hostname, remote_path, local_path))
887        self._update_uncollected_logs_list(update_func)
888
889
890    def get_client_logs(self):
891        """Retrieves the list of uncollected logs, if it exists.
892
893        @returns A list of (host, remote_path, local_path) tuples. Returns
894                 an empty list if no uncollected logs file exists.
895        """
896        log_exists = (self._uncollected_log_file and
897                      os.path.exists(self._uncollected_log_file))
898        if log_exists:
899            return pickle.load(open(self._uncollected_log_file))
900        else:
901            return []
902
903
904    def _fill_server_control_namespace(self, namespace, protect=True):
905        """
906        Prepare a namespace to be used when executing server control files.
907
908        This sets up the control file API by importing modules and making them
909        available under the appropriate names within namespace.
910
911        For use by _execute_code().
912
913        Args:
914          namespace: The namespace dictionary to fill in.
915          protect: Boolean.  If True (the default) any operation that would
916              clobber an existing entry in namespace will cause an error.
917        Raises:
918          error.AutoservError: When a name would be clobbered by import.
919        """
920        def _import_names(module_name, names=()):
921            """
922            Import a module and assign named attributes into namespace.
923
924            Args:
925                module_name: The string module name.
926                names: A limiting list of names to import from module_name.  If
927                    empty (the default), all names are imported from the module
928                    similar to a "from foo.bar import *" statement.
929            Raises:
930                error.AutoservError: When a name being imported would clobber
931                    a name already in namespace.
932            """
933            module = __import__(module_name, {}, {}, names)
934
935            # No names supplied?  Import * from the lowest level module.
936            # (Ugh, why do I have to implement this part myself?)
937            if not names:
938                for submodule_name in module_name.split('.')[1:]:
939                    module = getattr(module, submodule_name)
940                if hasattr(module, '__all__'):
941                    names = getattr(module, '__all__')
942                else:
943                    names = dir(module)
944
945            # Install each name into namespace, checking to make sure it
946            # doesn't override anything that already exists.
947            for name in names:
948                # Check for conflicts to help prevent future problems.
949                if name in namespace and protect:
950                    if namespace[name] is not getattr(module, name):
951                        raise error.AutoservError('importing name '
952                                '%s from %s %r would override %r' %
953                                (name, module_name, getattr(module, name),
954                                 namespace[name]))
955                    else:
956                        # Encourage cleanliness and the use of __all__ for a
957                        # more concrete API with less surprises on '*' imports.
958                        warnings.warn('%s (%r) being imported from %s for use '
959                                      'in server control files is not the '
960                                      'first occurrance of that import.' %
961                                      (name, namespace[name], module_name))
962
963                namespace[name] = getattr(module, name)
964
965
966        # This is the equivalent of prepending a bunch of import statements to
967        # the front of the control script.
968        namespace.update(os=os, sys=sys, logging=logging)
969        _import_names('autotest_lib.server',
970                ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',
971                 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))
972        _import_names('autotest_lib.server.subcommand',
973                      ('parallel', 'parallel_simple', 'subcommand'))
974        _import_names('autotest_lib.server.utils',
975                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
976        _import_names('autotest_lib.client.common_lib.error')
977        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
978
979        # Inject ourself as the job object into other classes within the API.
980        # (Yuck, this injection is a gross thing be part of a public API. -gps)
981        #
982        # XXX Base & SiteAutotest do not appear to use .job.  Who does?
983        namespace['autotest'].Autotest.job = self
984        # server.hosts.base_classes.Host uses .job.
985        namespace['hosts'].Host.job = self
986
987
988    def _execute_code(self, code_file, namespace, protect=True):
989        """
990        Execute code using a copy of namespace as a server control script.
991
992        Unless protect_namespace is explicitly set to False, the dict will not
993        be modified.
994
995        Args:
996          code_file: The filename of the control file to execute.
997          namespace: A dict containing names to make available during execution.
998          protect: Boolean.  If True (the default) a copy of the namespace dict
999              is used during execution to prevent the code from modifying its
1000              contents outside of this function.  If False the raw dict is
1001              passed in and modifications will be allowed.
1002        """
1003        if protect:
1004            namespace = namespace.copy()
1005        self._fill_server_control_namespace(namespace, protect=protect)
1006        # TODO: Simplify and get rid of the special cases for only 1 machine.
1007        if len(self.machines) > 1:
1008            machines_text = '\n'.join(self.machines) + '\n'
1009            # Only rewrite the file if it does not match our machine list.
1010            try:
1011                machines_f = open(MACHINES_FILENAME, 'r')
1012                existing_machines_text = machines_f.read()
1013                machines_f.close()
1014            except EnvironmentError:
1015                existing_machines_text = None
1016            if machines_text != existing_machines_text:
1017                utils.open_write_close(MACHINES_FILENAME, machines_text)
1018        execfile(code_file, namespace, namespace)
1019
1020
1021    def _parse_status(self, new_line):
1022        if not self._using_parser:
1023            return
1024        new_tests = self.parser.process_lines([new_line])
1025        for test in new_tests:
1026            self.__insert_test(test)
1027
1028
1029    def __insert_test(self, test):
1030        """
1031        An internal method to insert a new test result into the
1032        database. This method will not raise an exception, even if an
1033        error occurs during the insert, to avoid failing a test
1034        simply because of unexpected database issues."""
1035        self.num_tests_run += 1
1036        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):
1037            self.num_tests_failed += 1
1038        try:
1039            self.results_db.insert_test(self.job_model, test)
1040        except Exception:
1041            msg = ("WARNING: An unexpected error occured while "
1042                   "inserting test results into the database. "
1043                   "Ignoring error.\n" + traceback.format_exc())
1044            print >> sys.stderr, msg
1045
1046
1047    def preprocess_client_state(self):
1048        """
1049        Produce a state file for initializing the state of a client job.
1050
1051        Creates a new client state file with all the current server state, as
1052        well as some pre-set client state.
1053
1054        @returns The path of the file the state was written into.
1055        """
1056        # initialize the sysinfo state
1057        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1058
1059        # dump the state out to a tempfile
1060        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1061        os.close(fd)
1062
1063        # write_to_file doesn't need locking, we exclusively own file_path
1064        self._state.write_to_file(file_path)
1065        return file_path
1066
1067
1068    def postprocess_client_state(self, state_path):
1069        """
1070        Update the state of this job with the state from a client job.
1071
1072        Updates the state of the server side of a job with the final state
1073        of a client job that was run. Updates the non-client-specific state,
1074        pulls in some specific bits from the client-specific state, and then
1075        discards the rest. Removes the state file afterwards
1076
1077        @param state_file A path to the state file from the client.
1078        """
1079        # update the on-disk state
1080        try:
1081            self._state.read_from_file(state_path)
1082            os.remove(state_path)
1083        except OSError, e:
1084            # ignore file-not-found errors
1085            if e.errno != errno.ENOENT:
1086                raise
1087            else:
1088                logging.debug('Client state file %s not found', state_path)
1089
1090        # update the sysinfo state
1091        if self._state.has('client', 'sysinfo'):
1092            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1093
1094        # drop all the client-specific state
1095        self._state.discard_namespace('client')
1096
1097
1098    def clear_all_known_hosts(self):
1099        """Clears known hosts files for all AbstractSSHHosts."""
1100        for host in self.hosts:
1101            if isinstance(host, abstract_ssh.AbstractSSHHost):
1102                host.clear_known_hosts()
1103
1104
1105site_server_job = utils.import_site_class(
1106    __file__, "autotest_lib.server.site_server_job", "site_server_job",
1107    base_server_job)
1108
1109class server_job(site_server_job):
1110    pass
1111
1112
1113class warning_manager(object):
1114    """Class for controlling warning logs. Manages the enabling and disabling
1115    of warnings."""
1116    def __init__(self):
1117        # a map of warning types to a list of disabled time intervals
1118        self.disabled_warnings = {}
1119
1120
1121    def is_valid(self, timestamp, warning_type):
1122        """Indicates if a warning (based on the time it occured and its type)
1123        is a valid warning. A warning is considered "invalid" if this type of
1124        warning was marked as "disabled" at the time the warning occured."""
1125        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1126        for start, end in disabled_intervals:
1127            if timestamp >= start and (end is None or timestamp < end):
1128                return False
1129        return True
1130
1131
1132    def disable_warnings(self, warning_type, current_time_func=time.time):
1133        """As of now, disables all further warnings of this type."""
1134        intervals = self.disabled_warnings.setdefault(warning_type, [])
1135        if not intervals or intervals[-1][1] is not None:
1136            intervals.append((int(current_time_func()), None))
1137
1138
1139    def enable_warnings(self, warning_type, current_time_func=time.time):
1140        """As of now, enables all further warnings of this type."""
1141        intervals = self.disabled_warnings.get(warning_type, [])
1142        if intervals and intervals[-1][1] is None:
1143            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1144