autotest.py revision a700772e70548b10721ba709c7ec5194bb6d7597
1# Copyright 2007 Google Inc. Released under the GPL v2
2
3import re, os, sys, traceback, subprocess, tempfile, time, pickle, glob
4from autotest_lib.server import installable_object, utils
5from autotest_lib.client.common_lib import log, error, debug
6from autotest_lib.client.common_lib import global_config, packages
7from autotest_lib.client.common_lib import utils as client_utils
8
9AUTOTEST_SVN  = 'svn://test.kernel.org/autotest/trunk/client'
10AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
11
12# Timeouts for powering down and up respectively
13HALT_TIME = 300
14BOOT_TIME = 1800
15CRASH_RECOVERY_TIME = 9000
16
17
18class BaseAutotest(installable_object.InstallableObject):
19    """
20    This class represents the Autotest program.
21
22    Autotest is used to run tests automatically and collect the results.
23    It also supports profilers.
24
25    Implementation details:
26    This is a leaf class in an abstract class hierarchy, it must
27    implement the unimplemented methods in parent classes.
28    """
29
30    def __init__(self, host = None):
31        self.host = host
32        self.got = False
33        self.installed = False
34        self.serverdir = utils.get_server_dir()
35        super(BaseAutotest, self).__init__()
36        self.logger = debug.get_logger(module='server')
37
38
39    install_in_tmpdir = False
40    @classmethod
41    def set_install_in_tmpdir(cls, flag):
42        """ Sets a flag that controls whether or not Autotest should by
43        default be installed in a "standard" directory (e.g.
44        /home/autotest, /usr/local/autotest) or a temporary directory. """
45        cls.install_in_tmpdir = flag
46
47
48    def _get_install_dir(self, host):
49        """ Determines the location where autotest should be installed on
50        host. If self.install_in_tmpdir is set, it will return a unique
51        temporary directory that autotest can be installed in. """
52        try:
53            autodir = _get_autodir(host)
54        except error.AutotestRunError:
55            autodir = '/usr/local/autotest'
56        if self.install_in_tmpdir:
57            autodir = host.get_tmp_dir(parent=autodir)
58        return autodir
59
60
61    @log.record
62    def install(self, host=None, autodir=None):
63        self._install(host=host, autodir=autodir)
64
65
66    def install_base(self, host=None, autodir=None):
67        """ Performs a lightweight autotest install. Useful for when you
68        want to run some client-side code but don't want to pay the cost
69        of a full installation. """
70        self._install(host=host, autodir=autodir, lightweight=True)
71
72
73    def _install(self, host=None, autodir=None, lightweight=False):
74        """
75        Install autotest.  If get() was not called previously, an
76        attempt will be made to install from the autotest svn
77        repository.
78
79        Args:
80            host: a Host instance on which autotest will be installed
81            autodir: location on the remote host to install to
82            lightweight: exclude tests, deps and profilers, if possible
83
84        Raises:
85            AutoservError: if a tarball was not specified and
86                the target host does not have svn installed in its path"""
87        if not host:
88            host = self.host
89        if not self.got:
90            self.get()
91        host.wait_up(timeout=30)
92        host.setup()
93        print "Installing autotest on %s" % host.hostname
94
95        # set up the autotest directory on the remote machine
96        if not autodir:
97            autodir = self._get_install_dir(host)
98        host.set_autodir(autodir)
99        host.run('mkdir -p "%s"' % utils.sh_escape(autodir))
100
101        # Fetch the autotest client from the nearest repository
102        try:
103            c = global_config.global_config
104            repos = c.get_config_value("PACKAGES", 'fetch_location', type=list)
105            pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,
106                          repo_urls=repos,
107                          do_locking=False,
108                          run_function=host.run,
109                          run_function_dargs=dict(timeout=600))
110            # The packages dir is used to store all the packages that
111            # are fetched on that client. (for the tests,deps etc.
112            # too apart from the client)
113            pkg_dir = os.path.join(autodir, 'packages')
114            # clean up the autodir except for the packages directory
115            host.run('cd %s && ls | grep -v "^packages$"'
116                     ' | xargs rm -rf && rm -rf .[^.]*' % autodir)
117            pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,
118                               preserve_install_dir=True)
119            self.installed = True
120            return
121        except global_config.ConfigError, e:
122            print ("Could not install autotest using the"
123                   " packaging system %s" %  e)
124        except (packages.PackageInstallError, error.AutoservRunError), e:
125            print "Could not install autotest from %s : %s " % (repos, e)
126
127
128        # try to install from file or directory
129        if self.source_material:
130            if os.path.isdir(self.source_material):
131                # Copy autotest recursively
132                if lightweight:
133                    dirs_to_exclude = set(["tests", "site_tests", "deps",
134                                           "tools", "profilers"])
135                    light_files = [os.path.join(self.source_material, f)
136                                   for f in os.listdir(self.source_material)
137                                   if f not in dirs_to_exclude]
138                    host.send_file(light_files, autodir, delete_dest=True)
139
140                    # create empty dirs for all the stuff we excluded
141                    commands = []
142                    for path in dirs_to_exclude:
143                        abs_path = os.path.join(autodir, path)
144                        abs_path = utils.sh_escape(abs_path)
145                        commands.append("mkdir -p '%s'" % abs_path)
146                    host.run(';'.join(commands))
147                else:
148                    host.send_file(self.source_material, autodir,
149                                   delete_dest=True)
150            else:
151                # Copy autotest via tarball
152                e_msg = 'Installation method not yet implemented!'
153                raise NotImplementedError(e_msg)
154            print "Installation of autotest completed"
155            self.installed = True
156            return
157
158        # if that fails try to install using svn
159        if utils.run('which svn').exit_status:
160            raise error.AutoservError('svn not found on target machine: %s'
161                                                                   % host.name)
162        try:
163            host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir))
164        except error.AutoservRunError, e:
165            host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir))
166        print "Installation of autotest completed"
167        self.installed = True
168
169
170    def get(self, location = None):
171        if not location:
172            location = os.path.join(self.serverdir, '../client')
173            location = os.path.abspath(location)
174        # If there's stuff run on our client directory already, it
175        # can cause problems. Try giving it a quick clean first.
176        cwd = os.getcwd()
177        os.chdir(location)
178        os.system('tools/make_clean')
179        os.chdir(cwd)
180        super(BaseAutotest, self).get(location)
181        self.got = True
182
183
184    def run(self, control_file, results_dir = '.', host = None,
185            timeout=None, tag=None, parallel_flag=False, background=False):
186        """
187        Run an autotest job on the remote machine.
188
189        Args:
190                control_file: an open file-like-obj of the control file
191                results_dir: a str path where the results should be stored
192                        on the local filesystem
193                host: a Host instance on which the control file should
194                        be run
195                tag: tag name for the client side instance of autotest
196                parallel_flag: flag set when multiple jobs are run at the
197                          same time
198                background: indicates that the client should be launched as
199                            a background job; the code calling run will be
200                            responsible for monitoring the client and
201                            collecting the results
202        Raises:
203                AutotestRunError: if there is a problem executing
204                        the control file
205        """
206        host = self._get_host_and_setup(host)
207        results_dir = os.path.abspath(results_dir)
208
209        if tag:
210            results_dir = os.path.join(results_dir, tag)
211
212        atrun = _Run(host, results_dir, tag, parallel_flag, background)
213        self._do_run(control_file, results_dir, host, atrun, timeout)
214
215
216    def _get_host_and_setup(self, host):
217        if not host:
218            host = self.host
219        if not self.installed:
220            self.install(host)
221
222        host.wait_up(timeout=30)
223        return host
224
225
226    def _do_run(self, control_file, results_dir, host, atrun, timeout):
227        try:
228            atrun.verify_machine()
229        except:
230            print "Verify failed on %s. Reinstalling autotest" % host.hostname
231            self.install(host)
232        atrun.verify_machine()
233        debug = os.path.join(results_dir, 'debug')
234        try:
235            os.makedirs(debug)
236        except Exception:
237            pass
238
239        delete_file_list = [atrun.remote_control_file,
240                            atrun.remote_control_file + '.state',
241                            atrun.manual_control_file,
242                            atrun.manual_control_file + '.state']
243        cmd = ';'.join('rm -f ' + control for control in delete_file_list)
244        host.run(cmd, ignore_status=True)
245
246        tmppath = utils.get(control_file)
247
248        cfile = "job.default_boot_tag(%r)\n" % host.job.last_boot_tag
249        cfile += "job.default_test_cleanup(%r)\n" % host.job.run_test_cleanup
250
251        # If the packaging system is being used, add the repository list.
252        try:
253            c = global_config.global_config
254            repos = c.get_config_value("PACKAGES", 'fetch_location', type=list)
255            pkgmgr = packages.PackageManager('autotest', hostname=host.hostname,
256                                             repo_urls=repos)
257            cfile += 'job.add_repository(%s)\n' % pkgmgr.repo_urls
258        except global_config.ConfigError, e:
259            pass
260
261        cfile += open(tmppath).read()
262        open(tmppath, "w").write(cfile)
263
264        # Create and copy state file to remote_control_file + '.state'
265        sysinfo_state = {"__sysinfo": host.job.sysinfo.serialize()}
266        state_file = self._create_state_file(host.job, sysinfo_state)
267        host.send_file(state_file, atrun.remote_control_file + '.state')
268        os.remove(state_file)
269
270        # Copy control_file to remote_control_file on the host
271        host.send_file(tmppath, atrun.remote_control_file)
272        if os.path.abspath(tmppath) != os.path.abspath(control_file):
273            os.remove(tmppath)
274
275        try:
276            atrun.execute_control(timeout=timeout)
277        finally:
278            if not atrun.background:
279                collector = log_collector(host, atrun.tag, results_dir)
280                collector.collect_client_job_results()
281                self._process_client_state_file(host, atrun, results_dir)
282
283
284    def _create_state_file(self, job, state_dict):
285        """ Create a state file from a dictionary. Returns the path of the
286        state file. """
287        fd, path = tempfile.mkstemp(dir=job.tmpdir)
288        state_file = os.fdopen(fd, "w")
289        pickle.dump(state_dict, state_file)
290        state_file.close()
291        return path
292
293
294    def _process_client_state_file(self, host, atrun, results_dir):
295        state_file = os.path.basename(atrun.remote_control_file) + ".state"
296        state_path = os.path.join(results_dir, state_file)
297        try:
298            state_dict = pickle.load(open(state_path))
299        except Exception, e:
300            msg = "Ignoring error while loading client job state file: %s" % e
301            self.logger.warning(msg)
302            state_dict = {}
303
304        # clear out the state file
305        # TODO: stash the file away somewhere useful instead
306        try:
307            os.remove(state_path)
308        except Exception:
309            pass
310
311        msg = "Persistent state variables pulled back from %s: %s"
312        msg %= (host.hostname, state_dict)
313        print msg
314
315        if "__run_test_cleanup" in state_dict:
316            if state_dict["__run_test_cleanup"]:
317                host.job.enable_test_cleanup()
318            else:
319                host.job.disable_test_cleanup()
320
321        if "__last_boot_tag" in state_dict:
322            host.job.last_boot_tag = state_dict["__last_boot_tag"]
323
324        if "__sysinfo" in state_dict:
325            host.job.sysinfo.deserialize(state_dict["__sysinfo"])
326
327
328    def run_timed_test(self, test_name, results_dir='.', host=None,
329                       timeout=None, *args, **dargs):
330        """
331        Assemble a tiny little control file to just run one test,
332        and run it as an autotest client-side test
333        """
334        if not host:
335            host = self.host
336        if not self.installed:
337            self.install(host)
338        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]
339        cmd = ", ".join([repr(test_name)] + map(repr, args) + opts)
340        control = "job.run_test(%s)\n" % cmd
341        self.run(control, results_dir, host, timeout=timeout)
342
343
344    def run_test(self, test_name, results_dir='.', host=None, *args, **dargs):
345        self.run_timed_test(test_name, results_dir, host, timeout=None,
346                            *args, **dargs)
347
348
349class _Run(object):
350    """
351    Represents a run of autotest control file.  This class maintains
352    all the state necessary as an autotest control file is executed.
353
354    It is not intended to be used directly, rather control files
355    should be run using the run method in Autotest.
356    """
357    def __init__(self, host, results_dir, tag, parallel_flag, background):
358        self.host = host
359        self.results_dir = results_dir
360        self.env = host.env
361        self.tag = tag
362        self.parallel_flag = parallel_flag
363        self.background = background
364        self.autodir = _get_autodir(self.host)
365        control = os.path.join(self.autodir, 'control')
366        if tag:
367            control += '.' + tag
368        self.manual_control_file = control
369        self.remote_control_file = control + '.autoserv'
370        self.logger = debug.get_logger(module='server')
371
372
373    def verify_machine(self):
374        binary = os.path.join(self.autodir, 'bin/autotest')
375        try:
376            self.host.run('ls %s > /dev/null 2>&1' % binary)
377        except:
378            raise "Autotest does not appear to be installed"
379
380        if not self.parallel_flag:
381            tmpdir = os.path.join(self.autodir, 'tmp')
382            download = os.path.join(self.autodir, 'tests/download')
383            self.host.run('umount %s' % tmpdir, ignore_status=True)
384            self.host.run('umount %s' % download, ignore_status=True)
385
386    def get_full_cmd(self, section):
387        # build up the full command we want to run over the host
388        cmd = [os.path.join(self.autodir, 'bin/autotest_client')]
389        if not self.background:
390            cmd.append('-H autoserv')
391        if section > 0:
392            cmd.append('-c')
393        if self.tag:
394            cmd.append('-t %s' % self.tag)
395        if self.host.job.use_external_logging():
396            cmd.append('-l')
397        cmd.append(self.remote_control_file)
398        if self.background:
399            cmd = ['nohup'] + cmd + ['>/dev/null 2>/dev/null &']
400        return ' '.join(cmd)
401
402
403    def get_client_log(self, section):
404        """ Find what the "next" client.log.* file should be and open it. """
405        debug_dir = os.path.join(self.results_dir, "debug")
406        client_logs = glob.glob(os.path.join(debug_dir, "client.log.*"))
407        next_log = os.path.join(debug_dir, "client.log.%d" % len(client_logs))
408        return open(next_log, "w", 0)
409
410
411    @staticmethod
412    def is_client_job_finished(last_line):
413        return bool(re.match(r'^END .*\t----\t----\t.*$', last_line))
414
415
416    @staticmethod
417    def is_client_job_rebooting(last_line):
418        return bool(re.match(r'^\t*GOOD\t----\treboot\.start.*$', last_line))
419
420
421    def log_unexpected_abort(self):
422        msg = "Autotest client terminated unexpectedly"
423        self.host.job.record("END ABORT", None, None, msg)
424
425
426    def execute_section(self, section, timeout, stderr_redirector):
427        print "Executing %s/bin/autotest %s/control phase %d" % \
428                                (self.autodir, self.autodir, section)
429
430        full_cmd = self.get_full_cmd(section)
431        client_log = self.get_client_log(section)
432
433        try:
434            old_resultdir = self.host.job.resultdir
435            self.host.job.resultdir = self.results_dir
436            result = self.host.run(full_cmd, ignore_status=True,
437                                   timeout=timeout,
438                                   stdout_tee=client_log,
439                                   stderr_tee=stderr_redirector)
440        finally:
441            self.host.job.resultdir = old_resultdir
442            last_line = stderr_redirector.last_line
443
444        # check if we failed hard enough to warrant an exception
445        if result.exit_status == 1:
446            err = error.AutotestRunError("client job was aborted")
447        elif not self.background and not result.stderr:
448            err = error.AutotestRunError(
449                "execute_section: %s failed to return anything\n"
450                "stdout:%s\n" % (full_cmd, result.stdout))
451        else:
452            err = None
453
454        # log something if the client failed AND never finished logging
455        if err and not self.is_client_job_finished(last_line):
456            self.log_unexpected_abort()
457
458        if err:
459            raise err
460        else:
461            return stderr_redirector.last_line
462
463
464    def _wait_for_reboot(self):
465        self.logger.info("Client is rebooting")
466        self.logger.info("Waiting for client to halt")
467        if not self.host.wait_down(HALT_TIME):
468            err = "%s failed to shutdown after %d"
469            err %= (self.host.hostname, HALT_TIME)
470            raise error.AutotestRunError(err)
471        self.logger.info("Client down, waiting for restart")
472        if not self.host.wait_up(BOOT_TIME):
473            # since reboot failed
474            # hardreset the machine once if possible
475            # before failing this control file
476            warning = "%s did not come back up, hard resetting"
477            warning %= self.host.hostname
478            self.logger.warning(warning)
479            try:
480                self.host.hardreset(wait=False)
481            except (AttributeError, error.AutoservUnsupportedError):
482                warning = "Hard reset unsupported on %s"
483                warning %= self.host.hostname
484                self.logger.warning(warning)
485            raise error.AutotestRunError("%s failed to boot after %ds" %
486                                         (self.host.hostname, BOOT_TIME))
487        self.host.reboot_followup()
488
489
490    def execute_control(self, timeout=None):
491        section = 0
492        start_time = time.time()
493
494        logger = client_logger(self.host, self.tag, self.results_dir)
495        try:
496            while not timeout or time.time() < start_time + timeout:
497                if timeout:
498                    section_timeout = start_time + timeout - time.time()
499                else:
500                    section_timeout = None
501                last = self.execute_section(section, section_timeout,
502                                            logger)
503                if self.background:
504                    return
505                section += 1
506                if self.is_client_job_finished(last):
507                    print "Client complete"
508                    return
509                elif self.is_client_job_rebooting(last):
510                    try:
511                        self._wait_for_reboot()
512                    except error.AutotestRunError, e:
513                        self.host.job.record("ABORT", None, "reboot", str(e))
514                        self.host.job.record("END ABORT", None, None, str(e))
515                        raise
516                    continue
517
518                # if we reach here, something unexpected happened
519                self.log_unexpected_abort()
520
521                # give the client machine a chance to recover from a crash
522                self.host.wait_up(CRASH_RECOVERY_TIME)
523                msg = ("Aborting - unexpected final status message from "
524                       "client: %s\n") % last
525                raise error.AutotestRunError(msg)
526        finally:
527            logger.close()
528
529        # should only get here if we timed out
530        assert timeout
531        raise error.AutotestTimeoutError()
532
533
534def _get_autodir(host):
535    autodir = host.get_autodir()
536    if autodir:
537        return autodir
538    try:
539        # There's no clean way to do this. readlink may not exist
540        cmd = "python -c 'import os,sys; print os.readlink(sys.argv[1])' /etc/autotest.conf 2> /dev/null"
541        autodir = os.path.dirname(host.run(cmd).stdout)
542        if autodir:
543            return autodir
544    except error.AutoservRunError:
545        pass
546    for path in ['/usr/local/autotest', '/home/autotest']:
547        try:
548            host.run('ls %s > /dev/null 2>&1' %
549                     os.path.join(path, 'bin/autotest'))
550            return path
551        except error.AutoservRunError:
552            pass
553    raise error.AutotestRunError("Cannot figure out autotest directory")
554
555
556class log_collector(object):
557    def __init__(self, host, client_tag, results_dir):
558        self.host = host
559        if not client_tag:
560            client_tag = "default"
561        self.client_results_dir = os.path.join(host.get_autodir(), "results",
562                                               client_tag)
563        self.server_results_dir = results_dir
564
565
566    def collect_client_job_results(self):
567        """ A method that collects all the current results of a running
568        client job into the results dir. By default does nothing as no
569        client job is running, but when running a client job you can override
570        this with something that will actually do something. """
571
572        # make an effort to wait for the machine to come up
573        try:
574            self.host.wait_up(timeout=30)
575        except error.AutoservError:
576            # don't worry about any errors, we'll try and
577            # get the results anyway
578            pass
579
580
581        # Copy all dirs in default to results_dir
582        try:
583            keyval_path = self._prepare_for_copying_logs()
584            self.host.get_file(self.client_results_dir + '/',
585                               self.server_results_dir)
586            self._process_copied_logs(keyval_path)
587            self._postprocess_copied_logs()
588        except Exception:
589            # well, don't stop running just because we couldn't get logs
590            print "Unexpected error copying test result logs, continuing ..."
591            traceback.print_exc(file=sys.stdout)
592
593
594    def _prepare_for_copying_logs(self):
595        server_keyval = os.path.join(self.server_results_dir, 'keyval')
596        if not os.path.exists(server_keyval):
597            # Client-side keyval file can be copied directly
598            return
599
600        # Copy client-side keyval to temporary location
601        suffix = '.keyval_%s' % self.host.hostname
602        fd, keyval_path = tempfile.mkstemp(suffix)
603        os.close(fd)
604        try:
605            client_keyval = os.path.join(self.client_results_dir, 'keyval')
606            try:
607                self.host.get_file(client_keyval, keyval_path)
608            finally:
609                # We will squirrel away the client side keyval
610                # away and move it back when we are done
611                remote_temp_dir = self.host.get_tmp_dir()
612                self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
613                self.host.run('mv %s %s' % (client_keyval,
614                                            self.temp_keyval_path))
615        except (error.AutoservRunError, error.AutoservSSHTimeout):
616            print "Prepare for copying logs failed"
617        return keyval_path
618
619
620    def _process_copied_logs(self, keyval_path):
621        if not keyval_path:
622            # Client-side keyval file was copied directly
623            return
624
625        # Append contents of keyval_<host> file to keyval file
626        try:
627            # Read in new and old keyval files
628            new_keyval = utils.read_keyval(keyval_path)
629            old_keyval = utils.read_keyval(self.server_results_dir)
630            # 'Delete' from new keyval entries that are in both
631            tmp_keyval = {}
632            for key, val in new_keyval.iteritems():
633                if key not in old_keyval:
634                    tmp_keyval[key] = val
635            # Append new info to keyval file
636            utils.write_keyval(self.server_results_dir, tmp_keyval)
637            # Delete keyval_<host> file
638            os.remove(keyval_path)
639        except IOError:
640            print "Process copied logs failed"
641
642
643    def _postprocess_copied_logs(self):
644        # we can now put our keyval file back
645        client_keyval = os.path.join(self.client_results_dir, 'keyval')
646        try:
647            self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
648        except Exception:
649            pass
650
651
652
653# a file-like object for catching stderr from an autotest client and
654# extracting status logs from it
655class client_logger(object):
656    """Partial file object to write to both stdout and
657    the status log file.  We only implement those methods
658    utils.run() actually calls.
659
660    Note that this class is fairly closely coupled with server_job, as it
661    uses special job._ methods to actually carry out the loggging.
662    """
663    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
664    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
665    extract_indent = re.compile(r"^(\t*).*$")
666
667    def __init__(self, host, tag, server_results_dir):
668        self.host = host
669        self.job = host.job
670        self.log_collector = log_collector(host, tag, server_results_dir)
671        self.leftover = ""
672        self.last_line = ""
673        self.logs = {}
674
675
676    def _process_log_dict(self, log_dict):
677        log_list = log_dict.pop("logs", [])
678        for key in sorted(log_dict.iterkeys()):
679            log_list += self._process_log_dict(log_dict.pop(key))
680        return log_list
681
682
683    def _process_logs(self):
684        """Go through the accumulated logs in self.log and print them
685        out to stdout and the status log. Note that this processes
686        logs in an ordering where:
687
688        1) logs to different tags are never interleaved
689        2) logs to x.y come before logs to x.y.z for all z
690        3) logs to x.y come before x.z whenever y < z
691
692        Note that this will in general not be the same as the
693        chronological ordering of the logs. However, if a chronological
694        ordering is desired that one can be reconstructed from the
695        status log by looking at timestamp lines."""
696        log_list = self._process_log_dict(self.logs)
697        for line in log_list:
698            self.job._record_prerendered(line + '\n')
699        if log_list:
700            self.last_line = log_list[-1]
701
702
703    def _process_quoted_line(self, tag, line):
704        """Process a line quoted with an AUTOTEST_STATUS flag. If the
705        tag is blank then we want to push out all the data we've been
706        building up in self.logs, and then the newest line. If the
707        tag is not blank, then push the line into the logs for handling
708        later."""
709        print line
710        if tag == "":
711            self._process_logs()
712            self.job._record_prerendered(line + '\n')
713            self.last_line = line
714        else:
715            tag_parts = [int(x) for x in tag.split(".")]
716            log_dict = self.logs
717            for part in tag_parts:
718                log_dict = log_dict.setdefault(part, {})
719            log_list = log_dict.setdefault("logs", [])
720            log_list.append(line)
721
722
723    def _process_line(self, line):
724        """Write out a line of data to the appropriate stream. Status
725        lines sent by autotest will be prepended with
726        "AUTOTEST_STATUS", and all other lines are ssh error
727        messages."""
728        status_match = self.status_parser.search(line)
729        test_complete_match = self.test_complete_parser.search(line)
730        if status_match:
731            tag, line = status_match.groups()
732            self._process_quoted_line(tag, line)
733        elif test_complete_match:
734            fifo_path, = test_complete_match.groups()
735            self.log_collector.collect_client_job_results()
736            self.host.run("echo A > %s" % fifo_path)
737        else:
738            print line
739
740
741    def _format_warnings(self, last_line, warnings):
742        # use the indentation of whatever the last log line was
743        indent = self.extract_indent.match(last_line).group(1)
744        # if the last line starts a new group, add an extra indent
745        if last_line.lstrip('\t').startswith("START\t"):
746            indent += '\t'
747        return [self.job._render_record("WARN", None, None, msg,
748                                        timestamp, indent).rstrip('\n')
749                for timestamp, msg in warnings]
750
751
752    def _process_warnings(self, last_line, log_dict, warnings):
753        if log_dict.keys() in ([], ["logs"]):
754            # there are no sub-jobs, just append the warnings here
755            warnings = self._format_warnings(last_line, warnings)
756            log_list = log_dict.setdefault("logs", [])
757            log_list += warnings
758            for warning in warnings:
759                sys.stdout.write(warning + '\n')
760        else:
761            # there are sub-jobs, so put the warnings in there
762            log_list = log_dict.get("logs", [])
763            if log_list:
764                last_line = log_list[-1]
765            for key in sorted(log_dict.iterkeys()):
766                if key != "logs":
767                    self._process_warnings(last_line,
768                                           log_dict[key],
769                                           warnings)
770
771
772    def write(self, data):
773        # first check for any new console warnings
774        warnings = self.job._read_warnings()
775        self._process_warnings(self.last_line, self.logs, warnings)
776        # now process the newest data written out
777        data = self.leftover + data
778        lines = data.split("\n")
779        # process every line but the last one
780        for line in lines[:-1]:
781            self._process_line(line)
782        # save the last line for later processing
783        # since we may not have the whole line yet
784        self.leftover = lines[-1]
785
786
787    def flush(self):
788        sys.stdout.flush()
789
790
791    def close(self):
792        if self.leftover:
793            self._process_line(self.leftover)
794        self._process_logs()
795        self.flush()
796
797
798SiteAutotest = client_utils.import_site_class(
799    __file__, "autotest_lib.server.site_autotest", "SiteAutotest",
800    BaseAutotest)
801
802class Autotest(SiteAutotest):
803    pass
804