autotest.py revision d99d3b27fb076377354fa388e0a81e97bda443b8
1# Copyright 2007 Google Inc. Released under the GPL v2
2
3import re, os, sys, traceback, subprocess, tempfile, time, pickle, glob
4from autotest_lib.server import installable_object, utils
5from autotest_lib.client.common_lib import log, error, debug
6from autotest_lib.client.common_lib import global_config, packages
7
8AUTOTEST_SVN  = 'svn://test.kernel.org/autotest/trunk/client'
9AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
10
11# Timeouts for powering down and up respectively
12HALT_TIME = 300
13BOOT_TIME = 1800
14CRASH_RECOVERY_TIME = 9000
15
16
17class BaseAutotest(installable_object.InstallableObject):
18    """
19    This class represents the Autotest program.
20
21    Autotest is used to run tests automatically and collect the results.
22    It also supports profilers.
23
24    Implementation details:
25    This is a leaf class in an abstract class hierarchy, it must
26    implement the unimplemented methods in parent classes.
27    """
28
29    def __init__(self, host = None):
30        self.host = host
31        self.got = False
32        self.installed = False
33        self.serverdir = utils.get_server_dir()
34        super(BaseAutotest, self).__init__()
35        self.logger = debug.get_logger(module='server')
36
37
38    install_in_tmpdir = False
39    @classmethod
40    def set_install_in_tmpdir(cls, flag):
41        """ Sets a flag that controls whether or not Autotest should by
42        default be installed in a "standard" directory (e.g.
43        /home/autotest, /usr/local/autotest) or a temporary directory. """
44        cls.install_in_tmpdir = flag
45
46
47    def _get_install_dir(self, host):
48        """ Determines the location where autotest should be installed on
49        host. If self.install_in_tmpdir is set, it will return a unique
50        temporary directory that autotest can be installed in. """
51        try:
52            autodir = _get_autodir(host)
53        except error.AutotestRunError:
54            autodir = '/usr/local/autotest'
55        if self.install_in_tmpdir:
56            autodir = host.get_tmp_dir(parent=autodir)
57        return autodir
58
59
60    @log.record
61    def install(self, host=None, autodir=None):
62        self._install(host=host, autodir=autodir)
63
64
65    def install_base(self, host=None, autodir=None):
66        """ Performs a lightweight autotest install. Useful for when you
67        want to run some client-side code but don't want to pay the cost
68        of a full installation. """
69        self._install(host=host, autodir=autodir, lightweight=True)
70
71
72    def _install(self, host=None, autodir=None, lightweight=False):
73        """
74        Install autotest.  If get() was not called previously, an
75        attempt will be made to install from the autotest svn
76        repository.
77
78        Args:
79            host: a Host instance on which autotest will be installed
80            autodir: location on the remote host to install to
81            lightweight: exclude tests, deps and profilers, if possible
82
83        Raises:
84            AutoservError: if a tarball was not specified and
85                the target host does not have svn installed in its path"""
86        if not host:
87            host = self.host
88        if not self.got:
89            self.get()
90        host.wait_up(timeout=30)
91        host.setup()
92        print "Installing autotest on %s" % host.hostname
93
94        # set up the autotest directory on the remote machine
95        if not autodir:
96            autodir = self._get_install_dir(host)
97        host.set_autodir(autodir)
98        host.run('mkdir -p "%s"' % utils.sh_escape(autodir))
99
100        # Fetch the autotest client from the nearest repository
101        try:
102            c = global_config.global_config
103            repos = c.get_config_value("PACKAGES", 'fetch_location', type=list)
104            pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,
105                          repo_urls=repos,
106                          do_locking=False,
107                          run_function=host.run,
108                          run_function_dargs=dict(timeout=600))
109            # The packages dir is used to store all the packages that
110            # are fetched on that client. (for the tests,deps etc.
111            # too apart from the client)
112            pkg_dir = os.path.join(autodir, 'packages')
113            # clean up the autodir except for the packages directory
114            host.run('cd %s && ls | grep -v "^packages$"'
115                     ' | xargs rm -rf && rm -rf .[^.]*' % autodir)
116            pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,
117                               preserve_install_dir=True)
118            self.installed = True
119            return
120        except global_config.ConfigError, e:
121            print ("Could not install autotest using the"
122                   " packaging system %s" %  e)
123        except (packages.PackageInstallError, error.AutoservRunError), e:
124            print "Could not install autotest from %s : %s " % (repos, e)
125
126
127        # try to install from file or directory
128        if self.source_material:
129            if os.path.isdir(self.source_material):
130                # Copy autotest recursively
131                if lightweight:
132                    dirs_to_exclude = set(["tests", "site_tests", "deps",
133                                           "tools", "profilers"])
134                    light_files = [os.path.join(self.source_material, f)
135                                   for f in os.listdir(self.source_material)
136                                   if f not in dirs_to_exclude]
137                    host.send_file(light_files, autodir, delete_dest=True)
138
139                    # create empty dirs for all the stuff we excluded
140                    commands = []
141                    for path in dirs_to_exclude:
142                        abs_path = os.path.join(autodir, path)
143                        abs_path = utils.sh_escape(abs_path)
144                        commands.append("mkdir -p '%s'" % abs_path)
145                    host.run(';'.join(commands))
146                else:
147                    host.send_file(self.source_material, autodir,
148                                   delete_dest=True)
149            else:
150                # Copy autotest via tarball
151                e_msg = 'Installation method not yet implemented!'
152                raise NotImplementedError(e_msg)
153            print "Installation of autotest completed"
154            self.installed = True
155            return
156
157        # if that fails try to install using svn
158        if utils.run('which svn').exit_status:
159            raise error.AutoservError('svn not found on target machine: %s'
160                                                                   % host.name)
161        try:
162            host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir))
163        except error.AutoservRunError, e:
164            host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir))
165        print "Installation of autotest completed"
166        self.installed = True
167
168
169    def get(self, location = None):
170        if not location:
171            location = os.path.join(self.serverdir, '../client')
172            location = os.path.abspath(location)
173        # If there's stuff run on our client directory already, it
174        # can cause problems. Try giving it a quick clean first.
175        cwd = os.getcwd()
176        os.chdir(location)
177        os.system('tools/make_clean')
178        os.chdir(cwd)
179        super(BaseAutotest, self).get(location)
180        self.got = True
181
182
183    def run(self, control_file, results_dir = '.', host = None,
184            timeout=None, tag=None, parallel_flag=False, background=False):
185        """
186        Run an autotest job on the remote machine.
187
188        Args:
189                control_file: an open file-like-obj of the control file
190                results_dir: a str path where the results should be stored
191                        on the local filesystem
192                host: a Host instance on which the control file should
193                        be run
194                tag: tag name for the client side instance of autotest
195                parallel_flag: flag set when multiple jobs are run at the
196                          same time
197                background: indicates that the client should be launched as
198                            a background job; the code calling run will be
199                            responsible for monitoring the client and
200                            collecting the results
201        Raises:
202                AutotestRunError: if there is a problem executing
203                        the control file
204        """
205        host = self._get_host_and_setup(host)
206        results_dir = os.path.abspath(results_dir)
207
208        if tag:
209            results_dir = os.path.join(results_dir, tag)
210
211        atrun = _Run(host, results_dir, tag, parallel_flag, background)
212        self._do_run(control_file, results_dir, host, atrun, timeout)
213
214
215    def _get_host_and_setup(self, host):
216        if not host:
217            host = self.host
218        if not self.installed:
219            self.install(host)
220
221        host.wait_up(timeout=30)
222        return host
223
224
225    def _do_run(self, control_file, results_dir, host, atrun, timeout):
226        try:
227            atrun.verify_machine()
228        except:
229            print "Verify failed on %s. Reinstalling autotest" % host.hostname
230            self.install(host)
231        atrun.verify_machine()
232        debug = os.path.join(results_dir, 'debug')
233        try:
234            os.makedirs(debug)
235        except Exception:
236            pass
237
238        delete_file_list = [atrun.remote_control_file,
239                            atrun.remote_control_file + '.state',
240                            atrun.manual_control_file,
241                            atrun.manual_control_file + '.state']
242        cmd = ';'.join('rm -f ' + control for control in delete_file_list)
243        host.run(cmd, ignore_status=True)
244
245        tmppath = utils.get(control_file)
246
247        cfile = "job.default_boot_tag(%r)\n" % host.job.last_boot_tag
248        cfile += "job.default_test_cleanup(%r)\n" % host.job.run_test_cleanup
249
250        # If the packaging system is being used, add the repository list.
251        try:
252            c = global_config.global_config
253            repos = c.get_config_value("PACKAGES", 'fetch_location', type=list)
254            pkgmgr = packages.PackageManager('autotest', hostname=host.hostname,
255                                             repo_urls=repos)
256            cfile += 'job.add_repository(%s)\n' % pkgmgr.repo_urls
257        except global_config.ConfigError, e:
258            pass
259
260        cfile += open(tmppath).read()
261        open(tmppath, "w").write(cfile)
262
263        # Create and copy state file to remote_control_file + '.state'
264        sysinfo_state = {"__sysinfo": host.job.sysinfo.serialize()}
265        state_file = self._create_state_file(host.job, sysinfo_state)
266        host.send_file(state_file, atrun.remote_control_file + '.state')
267        os.remove(state_file)
268
269        # Copy control_file to remote_control_file on the host
270        host.send_file(tmppath, atrun.remote_control_file)
271        if os.path.abspath(tmppath) != os.path.abspath(control_file):
272            os.remove(tmppath)
273
274        try:
275            atrun.execute_control(timeout=timeout)
276        finally:
277            if not atrun.background:
278                collector = log_collector(host, atrun.tag, results_dir)
279                collector.collect_client_job_results()
280                self._process_client_state_file(host, atrun, results_dir)
281
282
283    def _create_state_file(self, job, state_dict):
284        """ Create a state file from a dictionary. Returns the path of the
285        state file. """
286        fd, path = tempfile.mkstemp(dir=job.tmpdir)
287        state_file = os.fdopen(fd, "w")
288        pickle.dump(state_dict, state_file)
289        state_file.close()
290        return path
291
292
293    def _process_client_state_file(self, host, atrun, results_dir):
294        state_file = os.path.basename(atrun.remote_control_file) + ".state"
295        state_path = os.path.join(results_dir, state_file)
296        try:
297            state_dict = pickle.load(open(state_path))
298        except Exception, e:
299            msg = "Ignoring error while loading client job state file: %s" % e
300            self.logger.warning(msg)
301            state_dict = {}
302
303        # clear out the state file
304        # TODO: stash the file away somewhere useful instead
305        try:
306            os.remove(state_path)
307        except Exception:
308            pass
309
310        msg = "Persistent state variables pulled back from %s: %s"
311        msg %= (host.hostname, state_dict)
312        print msg
313
314        if "__run_test_cleanup" in state_dict:
315            if state_dict["__run_test_cleanup"]:
316                host.job.enable_test_cleanup()
317            else:
318                host.job.disable_test_cleanup()
319
320        if "__last_boot_tag" in state_dict:
321            host.job.last_boot_tag = state_dict["__last_boot_tag"]
322
323        if "__sysinfo" in state_dict:
324            host.job.sysinfo.deserialize(state_dict["__sysinfo"])
325
326
327    def run_timed_test(self, test_name, results_dir='.', host=None,
328                       timeout=None, tag=None, *args, **dargs):
329        """
330        Assemble a tiny little control file to just run one test,
331        and run it as an autotest client-side test
332        """
333        if not host:
334            host = self.host
335        if not self.installed:
336            self.install(host)
337        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]
338        cmd = ", ".join([repr(test_name)] + map(repr, args) + opts)
339        control = "job.run_test(%s)\n" % cmd
340        self.run(control, results_dir, host, timeout=timeout, tag=tag)
341
342
343    def run_test(self, test_name, results_dir='.', host=None, tag=None,
344                 *args, **dargs):
345        self.run_timed_test(test_name, results_dir, host, timeout=None,
346                            tag=tag, *args, **dargs)
347
348
349class _Run(object):
350    """
351    Represents a run of autotest control file.  This class maintains
352    all the state necessary as an autotest control file is executed.
353
354    It is not intended to be used directly, rather control files
355    should be run using the run method in Autotest.
356    """
357    def __init__(self, host, results_dir, tag, parallel_flag, background):
358        self.host = host
359        self.results_dir = results_dir
360        self.env = host.env
361        self.tag = tag
362        self.parallel_flag = parallel_flag
363        self.background = background
364        self.autodir = _get_autodir(self.host)
365        control = os.path.join(self.autodir, 'control')
366        if tag:
367            control += '.' + tag
368        self.manual_control_file = control
369        self.remote_control_file = control + '.autoserv'
370        self.logger = debug.get_logger(module='server')
371
372
373    def verify_machine(self):
374        binary = os.path.join(self.autodir, 'bin/autotest')
375        try:
376            self.host.run('ls %s > /dev/null 2>&1' % binary)
377        except:
378            raise "Autotest does not appear to be installed"
379
380        if not self.parallel_flag:
381            tmpdir = os.path.join(self.autodir, 'tmp')
382            download = os.path.join(self.autodir, 'tests/download')
383            self.host.run('umount %s' % tmpdir, ignore_status=True)
384            self.host.run('umount %s' % download, ignore_status=True)
385
386    def get_full_cmd(self, section):
387        # build up the full command we want to run over the host
388        cmd = [os.path.join(self.autodir, 'bin/autotest_client')]
389        if not self.background:
390            cmd.append('-H autoserv')
391        if section > 0:
392            cmd.append('-c')
393        if self.tag:
394            cmd.append('-t %s' % self.tag)
395        if self.host.job.use_external_logging():
396            cmd.append('-l')
397        cmd.append(self.remote_control_file)
398        if self.background:
399            cmd = ['nohup'] + cmd + ['>/dev/null 2>/dev/null &']
400        return ' '.join(cmd)
401
402
403    def get_client_log(self, section):
404        """ Find what the "next" client.log.* file should be and open it. """
405        debug_dir = os.path.join(self.results_dir, "debug")
406        client_logs = glob.glob(os.path.join(debug_dir, "client.log.*"))
407        next_log = os.path.join(debug_dir, "client.log.%d" % len(client_logs))
408        return open(next_log, "w", 0)
409
410
411    def execute_section(self, section, timeout, stderr_redirector):
412        print "Executing %s/bin/autotest %s/control phase %d" % \
413                                (self.autodir, self.autodir, section)
414
415        full_cmd = self.get_full_cmd(section)
416        client_log = self.get_client_log(section)
417
418        try:
419            old_resultdir = self.host.job.resultdir
420            self.host.job.resultdir = self.results_dir
421            result = self.host.run(full_cmd, ignore_status=True,
422                                   timeout=timeout,
423                                   stdout_tee=client_log,
424                                   stderr_tee=stderr_redirector)
425        finally:
426            self.host.job.resultdir = old_resultdir
427
428        if result.exit_status == 1:
429            raise error.AutotestRunError("client job was aborted")
430        if not self.background and not result.stderr:
431            raise error.AutotestRunError(
432                "execute_section: %s failed to return anything\n"
433                "stdout:%s\n" % (full_cmd, result.stdout))
434
435        return stderr_redirector.last_line
436
437
438    def _wait_for_reboot(self):
439        self.logger.info("Client is rebooting")
440        self.logger.info("Waiting for client to halt")
441        if not self.host.wait_down(HALT_TIME):
442            err = "%s failed to shutdown after %d"
443            err %= (self.host.hostname, HALT_TIME)
444            raise error.AutotestRunError(err)
445        self.logger.info("Client down, waiting for restart")
446        if not self.host.wait_up(BOOT_TIME):
447            # since reboot failed
448            # hardreset the machine once if possible
449            # before failing this control file
450            warning = "%s did not come back up, hard resetting"
451            warning %= self.host.hostname
452            self.logger.warning(warning)
453            try:
454                self.host.hardreset(wait=False)
455            except (AttributeError, error.AutoservUnsupportedError):
456                warning = "Hard reset unsupported on %s"
457                warning %= self.host.hostname
458                self.logger.warning(warning)
459            raise error.AutotestRunError("%s failed to boot after %ds" %
460                                         (self.host.hostname, BOOT_TIME))
461        self.host.reboot_followup()
462
463
464    def execute_control(self, timeout=None):
465        section = 0
466        start_time = time.time()
467
468        logger = client_logger(self.host, self.tag, self.results_dir)
469        try:
470            while not timeout or time.time() < start_time + timeout:
471                if timeout:
472                    section_timeout = start_time + timeout - time.time()
473                else:
474                    section_timeout = None
475                last = self.execute_section(section, section_timeout,
476                                            logger)
477                if self.background:
478                    return
479                section += 1
480                if re.match(r'^END .*\t----\t----\t.*$', last):
481                    print "Client complete"
482                    return
483                elif re.match('^\t*GOOD\t----\treboot\.start.*$', last):
484                    try:
485                        self._wait_for_reboot()
486                    except error.AutotestRunError, e:
487                        self.host.job.record("ABORT", None, "reboot", str(e))
488                        self.host.job.record("END ABORT", None, None, str(e))
489                        raise
490                    continue
491
492                # if we reach here, something unexpected happened
493                msg = "Autotest client terminated unexpectedly"
494                self.host.job.record("END ABORT", None, None, msg)
495
496                # give the client machine a chance to recover from a crash
497                self.host.wait_up(CRASH_RECOVERY_TIME)
498                msg = ("Aborting - unexpected final status message from "
499                       "client: %s\n") % last
500                raise error.AutotestRunError(msg)
501        finally:
502            logger.close()
503
504        # should only get here if we timed out
505        assert timeout
506        raise error.AutotestTimeoutError()
507
508
509def _get_autodir(host):
510    autodir = host.get_autodir()
511    if autodir:
512        return autodir
513    try:
514        # There's no clean way to do this. readlink may not exist
515        cmd = "python -c 'import os,sys; print os.readlink(sys.argv[1])' /etc/autotest.conf 2> /dev/null"
516        autodir = os.path.dirname(host.run(cmd).stdout)
517        if autodir:
518            return autodir
519    except error.AutoservRunError:
520        pass
521    for path in ['/usr/local/autotest', '/home/autotest']:
522        try:
523            host.run('ls %s > /dev/null 2>&1' %
524                     os.path.join(path, 'bin/autotest'))
525            return path
526        except error.AutoservRunError:
527            pass
528    raise error.AutotestRunError("Cannot figure out autotest directory")
529
530
531class log_collector(object):
532    def __init__(self, host, client_tag, results_dir):
533        self.host = host
534        if not client_tag:
535            client_tag = "default"
536        self.client_results_dir = os.path.join(host.get_autodir(), "results",
537                                               client_tag)
538        self.server_results_dir = results_dir
539
540
541    def collect_client_job_results(self):
542        """ A method that collects all the current results of a running
543        client job into the results dir. By default does nothing as no
544        client job is running, but when running a client job you can override
545        this with something that will actually do something. """
546
547        # make an effort to wait for the machine to come up
548        try:
549            self.host.wait_up(timeout=30)
550        except error.AutoservError:
551            # don't worry about any errors, we'll try and
552            # get the results anyway
553            pass
554
555
556        # Copy all dirs in default to results_dir
557        try:
558            keyval_path = self._prepare_for_copying_logs()
559            self.host.get_file(self.client_results_dir + '/',
560                               self.server_results_dir)
561            self._process_copied_logs(keyval_path)
562            self._postprocess_copied_logs()
563        except Exception:
564            # well, don't stop running just because we couldn't get logs
565            print "Unexpected error copying test result logs, continuing ..."
566            traceback.print_exc(file=sys.stdout)
567
568
569    def _prepare_for_copying_logs(self):
570        server_keyval = os.path.join(self.server_results_dir, 'keyval')
571        if not os.path.exists(server_keyval):
572            # Client-side keyval file can be copied directly
573            return
574
575        # Copy client-side keyval to temporary location
576        suffix = '.keyval_%s' % self.host.hostname
577        fd, keyval_path = tempfile.mkstemp(suffix)
578        os.close(fd)
579        try:
580            client_keyval = os.path.join(self.client_results_dir, 'keyval')
581            try:
582                self.host.get_file(client_keyval, keyval_path)
583            finally:
584                # We will squirrel away the client side keyval
585                # away and move it back when we are done
586                remote_temp_dir = self.host.get_tmp_dir()
587                self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval")
588                self.host.run('mv %s %s' % (client_keyval,
589                                            self.temp_keyval_path))
590        except (error.AutoservRunError, error.AutoservSSHTimeout):
591            print "Prepare for copying logs failed"
592        return keyval_path
593
594
595    def _process_copied_logs(self, keyval_path):
596        if not keyval_path:
597            # Client-side keyval file was copied directly
598            return
599
600        # Append contents of keyval_<host> file to keyval file
601        try:
602            # Read in new and old keyval files
603            new_keyval = utils.read_keyval(keyval_path)
604            old_keyval = utils.read_keyval(self.server_results_dir)
605            # 'Delete' from new keyval entries that are in both
606            tmp_keyval = {}
607            for key, val in new_keyval.iteritems():
608                if key not in old_keyval:
609                    tmp_keyval[key] = val
610            # Append new info to keyval file
611            utils.write_keyval(self.server_results_dir, tmp_keyval)
612            # Delete keyval_<host> file
613            os.remove(keyval_path)
614        except IOError:
615            print "Process copied logs failed"
616
617
618    def _postprocess_copied_logs(self):
619        # we can now put our keyval file back
620        client_keyval = os.path.join(self.client_results_dir, 'keyval')
621        try:
622            self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval))
623        except Exception:
624            pass
625
626
627
628# a file-like object for catching stderr from an autotest client and
629# extracting status logs from it
630class client_logger(object):
631    """Partial file object to write to both stdout and
632    the status log file.  We only implement those methods
633    utils.run() actually calls.
634
635    Note that this class is fairly closely coupled with server_job, as it
636    uses special job._ methods to actually carry out the loggging.
637    """
638    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
639    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
640    extract_indent = re.compile(r"^(\t*).*$")
641
642    def __init__(self, host, tag, server_results_dir):
643        self.host = host
644        self.job = host.job
645        self.log_collector = log_collector(host, tag, server_results_dir)
646        self.leftover = ""
647        self.last_line = ""
648        self.logs = {}
649
650
651    def _process_log_dict(self, log_dict):
652        log_list = log_dict.pop("logs", [])
653        for key in sorted(log_dict.iterkeys()):
654            log_list += self._process_log_dict(log_dict.pop(key))
655        return log_list
656
657
658    def _process_logs(self):
659        """Go through the accumulated logs in self.log and print them
660        out to stdout and the status log. Note that this processes
661        logs in an ordering where:
662
663        1) logs to different tags are never interleaved
664        2) logs to x.y come before logs to x.y.z for all z
665        3) logs to x.y come before x.z whenever y < z
666
667        Note that this will in general not be the same as the
668        chronological ordering of the logs. However, if a chronological
669        ordering is desired that one can be reconstructed from the
670        status log by looking at timestamp lines."""
671        log_list = self._process_log_dict(self.logs)
672        for line in log_list:
673            self.job._record_prerendered(line + '\n')
674        if log_list:
675            self.last_line = log_list[-1]
676
677
678    def _process_quoted_line(self, tag, line):
679        """Process a line quoted with an AUTOTEST_STATUS flag. If the
680        tag is blank then we want to push out all the data we've been
681        building up in self.logs, and then the newest line. If the
682        tag is not blank, then push the line into the logs for handling
683        later."""
684        print line
685        if tag == "":
686            self._process_logs()
687            self.job._record_prerendered(line + '\n')
688            self.last_line = line
689        else:
690            tag_parts = [int(x) for x in tag.split(".")]
691            log_dict = self.logs
692            for part in tag_parts:
693                log_dict = log_dict.setdefault(part, {})
694            log_list = log_dict.setdefault("logs", [])
695            log_list.append(line)
696
697
698    def _process_line(self, line):
699        """Write out a line of data to the appropriate stream. Status
700        lines sent by autotest will be prepended with
701        "AUTOTEST_STATUS", and all other lines are ssh error
702        messages."""
703        status_match = self.status_parser.search(line)
704        test_complete_match = self.test_complete_parser.search(line)
705        if status_match:
706            tag, line = status_match.groups()
707            self._process_quoted_line(tag, line)
708        elif test_complete_match:
709            fifo_path, = test_complete_match.groups()
710            self.log_collector.collect_client_job_results()
711            self.host.run("echo A > %s" % fifo_path)
712        else:
713            print line
714
715
716    def _format_warnings(self, last_line, warnings):
717        # use the indentation of whatever the last log line was
718        indent = self.extract_indent.match(last_line).group(1)
719        # if the last line starts a new group, add an extra indent
720        if last_line.lstrip('\t').startswith("START\t"):
721            indent += '\t'
722        return [self.job._render_record("WARN", None, None, msg,
723                                        timestamp, indent).rstrip('\n')
724                for timestamp, msg in warnings]
725
726
727    def _process_warnings(self, last_line, log_dict, warnings):
728        if log_dict.keys() in ([], ["logs"]):
729            # there are no sub-jobs, just append the warnings here
730            warnings = self._format_warnings(last_line, warnings)
731            log_list = log_dict.setdefault("logs", [])
732            log_list += warnings
733            for warning in warnings:
734                sys.stdout.write(warning + '\n')
735        else:
736            # there are sub-jobs, so put the warnings in there
737            log_list = log_dict.get("logs", [])
738            if log_list:
739                last_line = log_list[-1]
740            for key in sorted(log_dict.iterkeys()):
741                if key != "logs":
742                    self._process_warnings(last_line,
743                                           log_dict[key],
744                                           warnings)
745
746
747    def write(self, data):
748        # first check for any new console warnings
749        warnings = self.job._read_warnings()
750        self._process_warnings(self.last_line, self.logs, warnings)
751        # now process the newest data written out
752        data = self.leftover + data
753        lines = data.split("\n")
754        # process every line but the last one
755        for line in lines[:-1]:
756            self._process_line(line)
757        # save the last line for later processing
758        # since we may not have the whole line yet
759        self.leftover = lines[-1]
760
761
762    def flush(self):
763        sys.stdout.flush()
764
765
766    def close(self):
767        if self.leftover:
768            self._process_line(self.leftover)
769        self._process_logs()
770        self.flush()
771
772
773# site_autotest.py may be non-existant or empty, make sure that an appropriate
774# SiteAutotest class is created nevertheless
775try:
776    from site_autotest import SiteAutotest
777except ImportError:
778    class SiteAutotest(BaseAutotest):
779        pass
780
781
782class Autotest(SiteAutotest):
783    pass
784