server_job.py revision bfb32f8d1b5b83506f7a91dd34efcc6e1feede5d
1"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
14import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback
15
16from autotest_lib.client.bin import fd_stack
17from autotest_lib.client.common_lib import error, log
18from autotest_lib.server import test, subcommand
19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20from autotest_lib.client.common_lib import utils, packages
21
22
23# load up a control segment
24# these are all stored in <server_dir>/control_segments
25def load_control_segment(name):
26    server_dir = os.path.dirname(os.path.abspath(__file__))
27    script_file = os.path.join(server_dir, "control_segments", name)
28    if os.path.exists(script_file):
29        return file(script_file).read()
30    else:
31        return ""
32
33
34preamble = """\
35import os, sys
36
37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler
38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel
39from autotest_lib.server import git_kernel
40from autotest_lib.server.subcommand import *
41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape
42from autotest_lib.server.utils import parse_machine
43from autotest_lib.client.common_lib.error import *
44from autotest_lib.client.common_lib import barrier
45
46autotest.Autotest.job = job
47hosts.Host.job = job
48barrier = barrier.barrier
49if len(machines) > 1:
50        open('.machines', 'w').write('\\n'.join(machines) + '\\n')
51"""
52
53client_wrapper = """
54at = autotest.Autotest()
55
56def run_client(machine):
57    hostname, user, passwd, port = parse_machine(
58        machine, ssh_user, ssh_port, ssh_pass)
59
60    host = hosts.create_host(hostname, user=user, port=port, password=passwd)
61    host.log_kernel()
62    at.run(control, host=host)
63
64job.parallel_simple(run_client, machines)
65"""
66
67crashdumps = """
68def crashdumps(machine):
69    hostname, user, passwd, port = parse_machine(machine, ssh_user,
70                                                 ssh_port, ssh_pass)
71
72    host = hosts.create_host(hostname, user=user, port=port,
73                             initialize=False, password=passwd)
74    host.get_crashdumps(test_start_time)
75
76job.parallel_simple(crashdumps, machines, log=False)
77"""
78
79
80crashinfo = """
81def crashinfo(machine):
82    hostname, user, passwd, port = parse_machine(machine, ssh_user,
83                                                 ssh_port, ssh_pass)
84
85    host = hosts.create_host(hostname, user=user, port=port,
86                             initialize=False, password=passwd)
87    host.get_crashinfo(test_start_time)
88
89job.parallel_simple(crashinfo, machines, log=False)
90"""
91
92
93reboot_segment="""\
94def reboot(machine):
95    hostname, user, passwd, port = parse_machine(machine, ssh_user,
96                                                 ssh_port, ssh_pass)
97
98    host = hosts.create_host(hostname, user=user, port=port,
99                             initialize=False, password=passwd)
100    host.reboot()
101
102job.parallel_simple(reboot, machines, log=False)
103"""
104
105install="""\
106def install(machine):
107    hostname, user, passwd, port = parse_machine(machine, ssh_user,
108                                                 ssh_port, ssh_pass)
109
110    host = hosts.create_host(hostname, user=user, port=port,
111                             initialize=False, password=passwd)
112    host.machine_install()
113
114job.parallel_simple(install, machines, log=False)
115"""
116
117# load up the verifier control segment, with an optional site-specific hook
118verify = load_control_segment("site_verify")
119verify += load_control_segment("verify")
120
121# load up the repair control segment, with an optional site-specific hook
122repair = load_control_segment("site_repair")
123repair += load_control_segment("repair")
124
125
126# load up site-specific code for generating site-specific job data
127try:
128    import site_job
129    get_site_job_data = site_job.get_site_job_data
130    del site_job
131except ImportError:
132    # by default provide a stub that generates no site data
133    def get_site_job_data(job):
134        return {}
135
136
137class base_server_job(object):
138    """The actual job against which we do everything.
139
140    Properties:
141            autodir
142                    The top level autotest directory (/usr/local/autotest).
143            serverdir
144                    <autodir>/server/
145            clientdir
146                    <autodir>/client/
147            conmuxdir
148                    <autodir>/conmux/
149            testdir
150                    <autodir>/server/tests/
151            site_testdir
152                    <autodir>/server/site_tests/
153            control
154                    the control file for this job
155    """
156
157    STATUS_VERSION = 1
158
159
160    def __init__(self, control, args, resultdir, label, user, machines,
161                 client=False, parse_job='',
162                 ssh_user='root', ssh_port=22, ssh_pass=''):
163        """
164                control
165                        The control file (pathname of)
166                args
167                        args to pass to the control file
168                resultdir
169                        where to throw the results
170                label
171                        label for the job
172                user
173                        Username for the job (email address)
174                client
175                        True if a client-side control file
176        """
177        path = os.path.dirname(__file__)
178        self.autodir = os.path.abspath(os.path.join(path, '..'))
179        self.serverdir = os.path.join(self.autodir, 'server')
180        self.testdir   = os.path.join(self.serverdir, 'tests')
181        self.site_testdir = os.path.join(self.serverdir, 'site_tests')
182        self.tmpdir    = os.path.join(self.serverdir, 'tmp')
183        self.conmuxdir = os.path.join(self.autodir, 'conmux')
184        self.clientdir = os.path.join(self.autodir, 'client')
185        self.toolsdir = os.path.join(self.autodir, 'client/tools')
186        if control:
187            self.control = open(control, 'r').read()
188            self.control = re.sub('\r', '', self.control)
189        else:
190            self.control = None
191        self.resultdir = resultdir
192        if not os.path.exists(resultdir):
193            os.mkdir(resultdir)
194        self.debugdir = os.path.join(resultdir, 'debug')
195        if not os.path.exists(self.debugdir):
196            os.mkdir(self.debugdir)
197        self.status = os.path.join(resultdir, 'status')
198        self.label = label
199        self.user = user
200        self.args = args
201        self.machines = machines
202        self.client = client
203        self.record_prefix = ''
204        self.warning_loggers = set()
205        self.ssh_user = ssh_user
206        self.ssh_port = ssh_port
207        self.ssh_pass = ssh_pass
208        self.run_test_cleanup = True
209
210        self.stdout = fd_stack.fd_stack(1, sys.stdout)
211        self.stderr = fd_stack.fd_stack(2, sys.stderr)
212
213        if not os.access(self.tmpdir, os.W_OK):
214            try:
215                os.makedirs(self.tmpdir, 0700)
216            except os.error, e:
217                # Thrown if the directory already exists, which it may.
218                pass
219
220        if (not os.access(self.tmpdir, os.W_OK) or
221                not os.path.isdir(self.tmpdir)):
222            self.tmpdir = os.path.join(tempfile.gettempdir(),
223                                       'autotest-' + getpass.getuser())
224            try:
225                os.makedirs(self.tmpdir, 0700)
226            except os.error, e:
227                # Thrown if the directory already exists, which it may.
228                # If the problem was something other than the
229                # directory already existing, this chmod should throw as well
230                # exception.
231                os.chmod(self.tmpdir, stat.S_IRWXU)
232
233        if os.path.exists(self.status):
234            os.unlink(self.status)
235        job_data = {'label' : label, 'user' : user,
236                    'hostname' : ','.join(machines),
237                    'status_version' : str(self.STATUS_VERSION)}
238        job_data.update(get_site_job_data(self))
239        utils.write_keyval(self.resultdir, job_data)
240
241        self.parse_job = parse_job
242        if self.parse_job and len(machines) == 1:
243            self.using_parser = True
244            self.init_parser(resultdir)
245        else:
246            self.using_parser = False
247        self.pkgmgr = packages.PackageManager(
248            self.autodir, run_function_dargs={'timeout':600})
249        self.pkgdir = os.path.join(self.autodir, 'packages')
250
251
252    def init_parser(self, resultdir):
253        """Start the continuous parsing of resultdir. This sets up
254        the database connection and inserts the basic job object into
255        the database if necessary."""
256        # redirect parser debugging to .parse.log
257        parse_log = os.path.join(resultdir, '.parse.log')
258        parse_log = open(parse_log, 'w', 0)
259        tko_utils.redirect_parser_debugging(parse_log)
260        # create a job model object and set up the db
261        self.results_db = tko_db.db(autocommit=True)
262        self.parser = status_lib.parser(self.STATUS_VERSION)
263        self.job_model = self.parser.make_job(resultdir)
264        self.parser.start(self.job_model)
265        # check if a job already exists in the db and insert it if
266        # it does not
267        job_idx = self.results_db.find_job(self.parse_job)
268        if job_idx is None:
269            self.results_db.insert_job(self.parse_job,
270                                       self.job_model)
271        else:
272            machine_idx = self.results_db.lookup_machine(
273                self.job_model.machine)
274            self.job_model.index = job_idx
275            self.job_model.machine_idx = machine_idx
276
277
278    def cleanup_parser(self):
279        """This should be called after the server job is finished
280        to carry out any remaining cleanup (e.g. flushing any
281        remaining test results to the results db)"""
282        if not self.using_parser:
283            return
284        final_tests = self.parser.end()
285        for test in final_tests:
286            self.__insert_test(test)
287        self.using_parser = False
288
289
290    def verify(self):
291        if not self.machines:
292            raise error.AutoservError(
293                'No machines specified to verify')
294        try:
295            namespace = {'machines' : self.machines, 'job' : self,
296                         'ssh_user' : self.ssh_user,
297                         'ssh_port' : self.ssh_port,
298                         'ssh_pass' : self.ssh_pass}
299            self._execute_code(preamble + verify, namespace)
300        except Exception, e:
301            msg = ('Verify failed\n' + str(e) + '\n'
302                    + traceback.format_exc())
303            self.record('ABORT', None, None, msg)
304            raise
305
306
307    def repair(self, host_protection):
308        if not self.machines:
309            raise error.AutoservError('No machines specified to repair')
310        namespace = {'machines': self.machines, 'job': self,
311                     'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port,
312                     'ssh_pass': self.ssh_pass,
313                     'protection_level': host_protection}
314        # no matter what happens during repair, go on to try to reverify
315        try:
316            self._execute_code(preamble + repair, namespace)
317        except Exception, exc:
318            print 'Exception occured during repair'
319            traceback.print_exc()
320        self.verify()
321
322
323    def precheck(self):
324        """
325        perform any additional checks in derived classes.
326        """
327        pass
328
329
330    def enable_external_logging(self):
331        """Start or restart external logging mechanism.
332        """
333        pass
334
335
336    def disable_external_logging(self):
337        """ Pause or stop external logging mechanism.
338        """
339        pass
340
341
342    def enable_test_cleanup(self):
343        """ By default tests run test.cleanup """
344        self.run_test_cleanup = True
345
346
347    def disable_test_cleanup(self):
348        """ By default tests do not run test.cleanup """
349        self.run_test_cleanup = False
350
351
352    def use_external_logging(self):
353        """Return True if external logging should be used.
354        """
355        return False
356
357
358    def parallel_simple(self, function, machines, log=True, timeout=None):
359        """Run 'function' using parallel_simple, with an extra
360        wrapper to handle the necessary setup for continuous parsing,
361        if possible. If continuous parsing is already properly
362        initialized then this should just work."""
363        is_forking = not (len(machines) == 1 and
364                          self.machines == machines)
365        if self.parse_job and is_forking and log:
366            def wrapper(machine):
367                self.parse_job += "/" + machine
368                self.using_parser = True
369                self.machines = [machine]
370                self.resultdir = os.path.join(self.resultdir,
371                                              machine)
372                os.chdir(self.resultdir)
373                self.init_parser(self.resultdir)
374                result = function(machine)
375                self.cleanup_parser()
376                return result
377        elif len(machines) > 1 and log:
378            def wrapper(machine):
379                self.resultdir = os.path.join(self.resultdir, machine)
380                os.chdir(self.resultdir)
381                result = function(machine)
382                return result
383        else:
384            wrapper = function
385        subcommand.parallel_simple(wrapper, machines, log, timeout)
386
387
388    def run(self, reboot = False, install_before = False,
389            install_after = False, collect_crashdumps = True,
390            namespace = {}):
391        # use a copy so changes don't affect the original dictionary
392        namespace = namespace.copy()
393        machines = self.machines
394
395        self.aborted = False
396        namespace['machines'] = machines
397        namespace['args'] = self.args
398        namespace['job'] = self
399        namespace['ssh_user'] = self.ssh_user
400        namespace['ssh_port'] = self.ssh_port
401        namespace['ssh_pass'] = self.ssh_pass
402        test_start_time = int(time.time())
403
404        os.chdir(self.resultdir)
405
406        self.enable_external_logging()
407        status_log = os.path.join(self.resultdir, 'status.log')
408        collect_crashinfo = True
409        try:
410            if install_before and machines:
411                self._execute_code(preamble + install, namespace)
412            if self.client:
413                namespace['control'] = self.control
414                open('control', 'w').write(self.control)
415                open('control.srv', 'w').write(client_wrapper)
416                server_control = client_wrapper
417            else:
418                open('control.srv', 'w').write(self.control)
419                server_control = self.control
420            self._execute_code(preamble + server_control, namespace)
421
422            # disable crashinfo collection if we get this far without error
423            collect_crashinfo = False
424        finally:
425            if machines and (collect_crashdumps or collect_crashinfo):
426                namespace['test_start_time'] = test_start_time
427                if collect_crashinfo:
428                    script = crashinfo # includes crashdumps
429                else:
430                    script = crashdumps
431                self._execute_code(preamble + script, namespace)
432            self.disable_external_logging()
433            if reboot and machines:
434                self._execute_code(preamble + reboot_segment, namespace)
435            if install_after and machines:
436                self._execute_code(preamble + install, namespace)
437
438
439    def run_test(self, url, *args, **dargs):
440        """Summon a test object and run it.
441
442        tag
443                tag to add to testname
444        url
445                url of the test to run
446        """
447
448        (group, testname) = self.pkgmgr.get_package_name(url, 'test')
449
450        tag = dargs.pop('tag', None)
451        if tag:
452            testname += '.' + tag
453        subdir = testname
454
455        outputdir = os.path.join(self.resultdir, subdir)
456        if os.path.exists(outputdir):
457            msg = ("%s already exists, test <%s> may have"
458                   " already run with tag <%s>"
459                   % (outputdir, testname, tag) )
460            raise error.TestError(msg)
461        os.mkdir(outputdir)
462
463        def group_func():
464            try:
465                test.runtest(self, url, tag, args, dargs)
466            except error.TestBaseException, e:
467                self.record(e.exit_status, subdir, testname, str(e))
468                raise
469            except Exception, e:
470                info = str(e) + "\n" + traceback.format_exc()
471                self.record('FAIL', subdir, testname, info)
472                raise
473            else:
474                self.record('GOOD', subdir, testname,
475                            'completed successfully')
476
477        result, exc_info = self._run_group(testname, subdir, group_func)
478        if exc_info and isinstance(exc_info[1], error.TestBaseException):
479            return False
480        elif exc_info:
481            raise exc_info[0], exc_info[1], exc_info[2]
482        else:
483            return True
484
485
486    def _run_group(self, name, subdir, function, *args, **dargs):
487        """\
488        Underlying method for running something inside of a group.
489        """
490        result, exc_info = None, None
491        old_record_prefix = self.record_prefix
492        try:
493            self.record('START', subdir, name)
494            self.record_prefix += '\t'
495            try:
496                result = function(*args, **dargs)
497            finally:
498                self.record_prefix = old_record_prefix
499        except error.TestBaseException, e:
500            self.record("END %s" % e.exit_status, subdir, name, str(e))
501            exc_info = sys.exc_info()
502        except Exception, e:
503            err_msg = str(e) + '\n'
504            err_msg += traceback.format_exc()
505            self.record('END ABORT', subdir, name, err_msg)
506            raise error.JobError(name + ' failed\n' + traceback.format_exc())
507        else:
508            self.record('END GOOD', subdir, name)
509
510        return result, exc_info
511
512
513    def run_group(self, function, *args, **dargs):
514        """\
515        function:
516                subroutine to run
517        *args:
518                arguments for the function
519        """
520
521        name = function.__name__
522
523        # Allow the tag for the group to be specified.
524        tag = dargs.pop('tag', None)
525        if tag:
526            name = tag
527
528        return self._run_group(name, None, function, *args, **dargs)[0]
529
530
531    def run_reboot(self, reboot_func, get_kernel_func):
532        """\
533        A specialization of run_group meant specifically for handling
534        a reboot. Includes support for capturing the kernel version
535        after the reboot.
536
537        reboot_func: a function that carries out the reboot
538
539        get_kernel_func: a function that returns a string
540        representing the kernel version.
541        """
542
543        old_record_prefix = self.record_prefix
544        try:
545            self.record('START', None, 'reboot')
546            self.record_prefix += '\t'
547            reboot_func()
548        except Exception, e:
549            self.record_prefix = old_record_prefix
550            err_msg = str(e) + '\n' + traceback.format_exc()
551            self.record('END FAIL', None, 'reboot', err_msg)
552        else:
553            kernel = get_kernel_func()
554            self.record_prefix = old_record_prefix
555            self.record('END GOOD', None, 'reboot',
556                        optional_fields={"kernel": kernel})
557
558
559    def record(self, status_code, subdir, operation, status='',
560               optional_fields=None):
561        """
562        Record job-level status
563
564        The intent is to make this file both machine parseable and
565        human readable. That involves a little more complexity, but
566        really isn't all that bad ;-)
567
568        Format is <status code>\t<subdir>\t<operation>\t<status>
569
570        status code: see common_lib.log.is_valid_status()
571                     for valid status definition
572
573        subdir: MUST be a relevant subdirectory in the results,
574        or None, which will be represented as '----'
575
576        operation: description of what you ran (e.g. "dbench", or
577                                        "mkfs -t foobar /dev/sda9")
578
579        status: error message or "completed sucessfully"
580
581        ------------------------------------------------------------
582
583        Initial tabs indicate indent levels for grouping, and is
584        governed by self.record_prefix
585
586        multiline messages have secondary lines prefaced by a double
587        space ('  ')
588
589        Executing this method will trigger the logging of all new
590        warnings to date from the various console loggers.
591        """
592        # poll all our warning loggers for new warnings
593        warnings = self._read_warnings()
594        for timestamp, msg in warnings:
595            self._record("WARN", None, None, msg, timestamp)
596
597        # write out the actual status log line
598        self._record(status_code, subdir, operation, status,
599                      optional_fields=optional_fields)
600
601
602    def _read_warnings(self):
603        warnings = []
604        while True:
605            # pull in a line of output from every logger that has
606            # output ready to be read
607            loggers, _, _ = select.select(self.warning_loggers,
608                                          [], [], 0)
609            closed_loggers = set()
610            for logger in loggers:
611                line = logger.readline()
612                # record any broken pipes (aka line == empty)
613                if len(line) == 0:
614                    closed_loggers.add(logger)
615                    continue
616                timestamp, msg = line.split('\t', 1)
617                warnings.append((int(timestamp), msg.strip()))
618
619            # stop listening to loggers that are closed
620            self.warning_loggers -= closed_loggers
621
622            # stop if none of the loggers have any output left
623            if not loggers:
624                break
625
626        # sort into timestamp order
627        warnings.sort()
628        return warnings
629
630
631    def _render_record(self, status_code, subdir, operation, status='',
632                       epoch_time=None, record_prefix=None,
633                       optional_fields=None):
634        """
635        Internal Function to generate a record to be written into a
636        status log. For use by server_job.* classes only.
637        """
638        if subdir:
639            if re.match(r'[\n\t]', subdir):
640                raise ValueError(
641                    'Invalid character in subdir string')
642            substr = subdir
643        else:
644            substr = '----'
645
646        if not log.is_valid_status(status_code):
647            raise ValueError('Invalid status code supplied: %s' %
648                             status_code)
649        if not operation:
650            operation = '----'
651        if re.match(r'[\n\t]', operation):
652            raise ValueError(
653                'Invalid character in operation string')
654        operation = operation.rstrip()
655        status = status.rstrip()
656        status = re.sub(r"\t", "  ", status)
657        # Ensure any continuation lines are marked so we can
658        # detect them in the status file to ensure it is parsable.
659        status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
660
661        if not optional_fields:
662            optional_fields = {}
663
664        # Generate timestamps for inclusion in the logs
665        if epoch_time is None:
666            epoch_time = int(time.time())
667        local_time = time.localtime(epoch_time)
668        optional_fields["timestamp"] = str(epoch_time)
669        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
670                                                     local_time)
671
672        fields = [status_code, substr, operation]
673        fields += ["%s=%s" % x for x in optional_fields.iteritems()]
674        fields.append(status)
675
676        if record_prefix is None:
677            record_prefix = self.record_prefix
678
679        msg = '\t'.join(str(x) for x in fields)
680
681        return record_prefix + msg + '\n'
682
683
684    def _record_prerendered(self, msg):
685        """
686        Record a pre-rendered msg into the status logs. The only
687        change this makes to the message is to add on the local
688        indentation. Should not be called outside of server_job.*
689        classes. Unlike _record, this does not write the message
690        to standard output.
691        """
692        lines = []
693        status_file = os.path.join(self.resultdir, 'status.log')
694        status_log = open(status_file, 'a')
695        for line in msg.splitlines():
696            line = self.record_prefix + line + '\n'
697            lines.append(line)
698            status_log.write(line)
699        status_log.close()
700        self.__parse_status(lines)
701
702
703    def _execute_code(self, code, scope):
704        exec(code, scope, scope)
705
706
707    def _record(self, status_code, subdir, operation, status='',
708                 epoch_time=None, optional_fields=None):
709        """
710        Actual function for recording a single line into the status
711        logs. Should never be called directly, only by job.record as
712        this would bypass the console monitor logging.
713        """
714
715        msg = self._render_record(status_code, subdir, operation,
716                                  status, epoch_time,
717                                  optional_fields=optional_fields)
718
719
720        status_file = os.path.join(self.resultdir, 'status.log')
721        sys.stdout.write(msg)
722        open(status_file, "a").write(msg)
723        if subdir:
724            test_dir = os.path.join(self.resultdir, subdir)
725            status_file = os.path.join(test_dir, 'status.log')
726            open(status_file, "a").write(msg)
727        self.__parse_status(msg.splitlines())
728
729
730    def __parse_status(self, new_lines):
731        if not self.using_parser:
732            return
733        new_tests = self.parser.process_lines(new_lines)
734        for test in new_tests:
735            self.__insert_test(test)
736
737
738    def __insert_test(self, test):
739        """ An internal method to insert a new test result into the
740        database. This method will not raise an exception, even if an
741        error occurs during the insert, to avoid failing a test
742        simply because of unexpected database issues."""
743        try:
744            self.results_db.insert_test(self.job_model, test)
745        except Exception:
746            msg = ("WARNING: An unexpected error occured while "
747                   "inserting test results into the database. "
748                   "Ignoring error.\n" + traceback.format_exc())
749            print >> sys.stderr, msg
750
751
752
753class log_collector(object):
754    def __init__(self, host, client_tag, results_dir):
755        self.host = host
756        if not client_tag:
757            client_tag = "default"
758        self.client_results_dir = os.path.join(host.get_autodir(), "results",
759                                               client_tag)
760        self.server_results_dir = results_dir
761
762
763    def collect_client_job_results(self):
764        """ A method that collects all the current results of a running
765        client job into the results dir. By default does nothing as no
766        client job is running, but when running a client job you can override
767        this with something that will actually do something. """
768
769        # make an effort to wait for the machine to come up
770        try:
771            self.host.wait_up(timeout=30)
772        except error.AutoservError:
773            # don't worry about any errors, we'll try and
774            # get the results anyway
775            pass
776
777
778        # Copy all dirs in default to results_dir
779        try:
780            keyval_path = self._prepare_for_copying_logs()
781            self.host.get_file(self.client_results_dir + '/',
782                               self.server_results_dir)
783            self._process_copied_logs(keyval_path)
784            self._postprocess_copied_logs()
785        except Exception:
786            # well, don't stop running just because we couldn't get logs
787            print "Unexpected error copying test result logs, continuing ..."
788            traceback.print_exc(file=sys.stdout)
789
790
791    def _prepare_for_copying_logs(self):
792        server_keyval = os.path.join(self.server_results_dir, 'keyval')
793        if not os.path.exists(server_keyval):
794            # Client-side keyval file can be copied directly
795            return
796
797        # Copy client-side keyval to temporary location
798        suffix = '.keyval_%s' % self.host.hostname
799        fd, keyval_path = tempfile.mkstemp(suffix)
800        os.close(fd)
801        try:
802            client_keyval = os.path.join(self.client_results_dir, 'keyval')
803            try:
804                self.host.get_file(client_keyval, keyval_path)
805            finally:
806                # We will squirrel away the client side keyval
807                # away and move it back when we are done
808                remote_temp_dir = self.host.get_tmp_dir()
809                self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
810                self.host.run('mv %s %s' % (client_keyval,
811                                            self.temp_keyval_path))
812        except (error.AutoservRunError, error.AutoservSSHTimeout):
813            print "Prepare for copying logs failed"
814        return keyval_path
815
816
817    def _process_copied_logs(self, keyval_path):
818        if not keyval_path:
819            # Client-side keyval file was copied directly
820            return
821
822        # Append contents of keyval_<host> file to keyval file
823        try:
824            # Read in new and old keyval files
825            new_keyval = utils.read_keyval(keyval_path)
826            old_keyval = utils.read_keyval(self.server_results_dir)
827            # 'Delete' from new keyval entries that are in both
828            tmp_keyval = {}
829            for key, val in new_keyval.iteritems():
830                if key not in old_keyval:
831                    tmp_keyval[key] = val
832            # Append new info to keyval file
833            utils.write_keyval(self.server_results_dir, tmp_keyval)
834            # Delete keyval_<host> file
835            os.remove(keyval_path)
836        except IOError:
837            print "Process copied logs failed"
838
839
840    def _postprocess_copied_logs(self):
841        # we can now put our keyval file back
842        client_keyval = os.path.join(self.client_results_dir, 'keyval')
843        try:
844            self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
845        except Exception:
846            pass
847
848
849# a file-like object for catching stderr from an autotest client and
850# extracting status logs from it
851class client_logger(object):
852    """Partial file object to write to both stdout and
853    the status log file.  We only implement those methods
854    utils.run() actually calls.
855    """
856    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
857    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
858    extract_indent = re.compile(r"^(\t*).*$")
859
860    def __init__(self, host, tag, server_results_dir):
861        self.host = host
862        self.job = host.job
863        self.log_collector = log_collector(host, tag, server_results_dir)
864        self.leftover = ""
865        self.last_line = ""
866        self.logs = {}
867
868
869    def _process_log_dict(self, log_dict):
870        log_list = log_dict.pop("logs", [])
871        for key in sorted(log_dict.iterkeys()):
872            log_list += self._process_log_dict(log_dict.pop(key))
873        return log_list
874
875
876    def _process_logs(self):
877        """Go through the accumulated logs in self.log and print them
878        out to stdout and the status log. Note that this processes
879        logs in an ordering where:
880
881        1) logs to different tags are never interleaved
882        2) logs to x.y come before logs to x.y.z for all z
883        3) logs to x.y come before x.z whenever y < z
884
885        Note that this will in general not be the same as the
886        chronological ordering of the logs. However, if a chronological
887        ordering is desired that one can be reconstructed from the
888        status log by looking at timestamp lines."""
889        log_list = self._process_log_dict(self.logs)
890        for line in log_list:
891            self.job._record_prerendered(line + '\n')
892        if log_list:
893            self.last_line = log_list[-1]
894
895
896    def _process_quoted_line(self, tag, line):
897        """Process a line quoted with an AUTOTEST_STATUS flag. If the
898        tag is blank then we want to push out all the data we've been
899        building up in self.logs, and then the newest line. If the
900        tag is not blank, then push the line into the logs for handling
901        later."""
902        print line
903        if tag == "":
904            self._process_logs()
905            self.job._record_prerendered(line + '\n')
906            self.last_line = line
907        else:
908            tag_parts = [int(x) for x in tag.split(".")]
909            log_dict = self.logs
910            for part in tag_parts:
911                log_dict = log_dict.setdefault(part, {})
912            log_list = log_dict.setdefault("logs", [])
913            log_list.append(line)
914
915
916    def _process_line(self, line):
917        """Write out a line of data to the appropriate stream. Status
918        lines sent by autotest will be prepended with
919        "AUTOTEST_STATUS", and all other lines are ssh error
920        messages."""
921        status_match = self.status_parser.search(line)
922        test_complete_match = self.test_complete_parser.search(line)
923        if status_match:
924            tag, line = status_match.groups()
925            self._process_quoted_line(tag, line)
926        elif test_complete_match:
927            fifo_path, = test_complete_match.groups()
928            self.log_collector.collect_client_job_results()
929            self.host.run("echo A > %s" % fifo_path)
930        else:
931            print line
932
933
934    def _format_warnings(self, last_line, warnings):
935        # use the indentation of whatever the last log line was
936        indent = self.extract_indent.match(last_line).group(1)
937        # if the last line starts a new group, add an extra indent
938        if last_line.lstrip('\t').startswith("START\t"):
939            indent += '\t'
940        return [self.job._render_record("WARN", None, None, msg,
941                                        timestamp, indent).rstrip('\n')
942                for timestamp, msg in warnings]
943
944
945    def _process_warnings(self, last_line, log_dict, warnings):
946        if log_dict.keys() in ([], ["logs"]):
947            # there are no sub-jobs, just append the warnings here
948            warnings = self._format_warnings(last_line, warnings)
949            log_list = log_dict.setdefault("logs", [])
950            log_list += warnings
951            for warning in warnings:
952                sys.stdout.write(warning + '\n')
953        else:
954            # there are sub-jobs, so put the warnings in there
955            log_list = log_dict.get("logs", [])
956            if log_list:
957                last_line = log_list[-1]
958            for key in sorted(log_dict.iterkeys()):
959                if key != "logs":
960                    self._process_warnings(last_line,
961                                           log_dict[key],
962                                           warnings)
963
964
965    def write(self, data):
966        # first check for any new console warnings
967        warnings = self.job._read_warnings()
968        self._process_warnings(self.last_line, self.logs, warnings)
969        # now process the newest data written out
970        data = self.leftover + data
971        lines = data.split("\n")
972        # process every line but the last one
973        for line in lines[:-1]:
974            self._process_line(line)
975        # save the last line for later processing
976        # since we may not have the whole line yet
977        self.leftover = lines[-1]
978
979
980    def flush(self):
981        sys.stdout.flush()
982
983
984    def close(self):
985        if self.leftover:
986            self._process_line(self.leftover)
987        self._process_logs()
988        self.flush()
989
990
991# site_server_job.py may be non-existant or empty, make sure that an
992# appropriate site_server_job class is created nevertheless
993try:
994    from autotest_lib.server.site_server_job import site_server_job
995except ImportError:
996    class site_server_job(object):
997        pass
998
999class server_job(site_server_job, base_server_job):
1000    pass
1001