autotest.py revision bd450640e5be8e64d244b65488fbfb725056b7f9
1# Copyright 2007 Google Inc. Released under the GPL v2
2
3import re, os, sys, traceback, subprocess, tempfile, time, pickle, glob, logging
4import getpass
5from autotest_lib.server import installable_object, utils
6from autotest_lib.client.common_lib import log, error, autotemp
7from autotest_lib.client.common_lib import global_config, packages
8from autotest_lib.client.common_lib import utils as client_utils
9
10AUTOTEST_SVN  = 'svn://test.kernel.org/autotest/trunk/client'
11AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
12
13# Timeouts for powering down and up respectively
14HALT_TIME = 300
15BOOT_TIME = 1800
16CRASH_RECOVERY_TIME = 9000
17
18
19class BaseAutotest(installable_object.InstallableObject):
20    """
21    This class represents the Autotest program.
22
23    Autotest is used to run tests automatically and collect the results.
24    It also supports profilers.
25
26    Implementation details:
27    This is a leaf class in an abstract class hierarchy, it must
28    implement the unimplemented methods in parent classes.
29    """
30
31    def __init__(self, host = None):
32        self.host = host
33        self.got = False
34        self.installed = False
35        self.serverdir = utils.get_server_dir()
36        super(BaseAutotest, self).__init__()
37
38
39    install_in_tmpdir = False
40    @classmethod
41    def set_install_in_tmpdir(cls, flag):
42        """ Sets a flag that controls whether or not Autotest should by
43        default be installed in a "standard" directory (e.g.
44        /home/autotest, /usr/local/autotest) or a temporary directory. """
45        cls.install_in_tmpdir = flag
46
47
48    def _get_install_dir(self, host):
49        """ Determines the location where autotest should be installed on
50        host. If self.install_in_tmpdir is set, it will return a unique
51        temporary directory that autotest can be installed in. """
52        try:
53            autodir = _get_autodir(host)
54        except error.AutotestRunError:
55            autodir = '/usr/local/autotest'
56        if self.install_in_tmpdir:
57            autodir = host.get_tmp_dir(parent=autodir)
58        return autodir
59
60
61    @log.record
62    def install(self, host=None, autodir=None):
63        self._install(host=host, autodir=autodir)
64
65
66    @log.record
67    def install_full_client(self, host=None, autodir=None):
68        self._install(host=host, autodir=autodir, use_autoserv=False,
69                      use_packaging=False)
70
71
72    def install_no_autoserv(self, host=None, autodir=None):
73        self._install(host=host, autodir=autodir, use_autoserv=False)
74
75
76    def _install_using_packaging(self, host, autodir):
77        c = global_config.global_config
78        repos = c.get_config_value("PACKAGES", 'fetch_location', type=list,
79                                   default=[])
80        repos.reverse()
81        if not repos:
82            raise error.PackageInstallError("No repos to install an "
83                                            "autotest client from")
84        pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,
85                                         repo_urls=repos,
86                                         do_locking=False,
87                                         run_function=host.run,
88                                         run_function_dargs=dict(timeout=600))
89        # The packages dir is used to store all the packages that
90        # are fetched on that client. (for the tests,deps etc.
91        # too apart from the client)
92        pkg_dir = os.path.join(autodir, 'packages')
93        # clean up the autodir except for the packages directory
94        host.run('cd %s && ls | grep -v "^packages$"'
95                 ' | xargs rm -rf && rm -rf .[^.]*' % autodir)
96        pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,
97                           preserve_install_dir=True)
98        self.installed = True
99
100
101    def _install_using_send_file(self, host, autodir):
102        dirs_to_exclude = set(["tests", "site_tests", "deps", "profilers"])
103        light_files = [os.path.join(self.source_material, f)
104                       for f in os.listdir(self.source_material)
105                       if f not in dirs_to_exclude]
106        host.send_file(light_files, autodir, delete_dest=True)
107
108        # create empty dirs for all the stuff we excluded
109        commands = []
110        for path in dirs_to_exclude:
111            abs_path = os.path.join(autodir, path)
112            abs_path = utils.sh_escape(abs_path)
113            commands.append("mkdir -p '%s'" % abs_path)
114            commands.append("touch '%s'/__init__.py" % abs_path)
115        host.run(';'.join(commands))
116
117
118    def _install(self, host=None, autodir=None, use_autoserv=True,
119                 use_packaging=True):
120        """
121        Install autotest.  If get() was not called previously, an
122        attempt will be made to install from the autotest svn
123        repository.
124
125        @param host A Host instance on which autotest will be installed
126        @param autodir Location on the remote host to install to
127        @param use_autoserv Enable install modes that depend on the client
128            running with the autoserv harness
129        @param use_packaging Enable install modes that use the packaging system
130
131        @exception AutoservError if a tarball was not specified and
132            the target host does not have svn installed in its path
133        """
134        if not host:
135            host = self.host
136        if not self.got:
137            self.get()
138        host.wait_up(timeout=30)
139        host.setup()
140        logging.info("Installing autotest on %s", host.hostname)
141
142        # set up the autotest directory on the remote machine
143        if not autodir:
144            autodir = self._get_install_dir(host)
145        host.set_autodir(autodir)
146        host.run('mkdir -p %s' % utils.sh_escape(autodir))
147
148        # make sure there are no files in $AUTODIR/results
149        results_path = os.path.join(autodir, 'results')
150        host.run('rm -rf %s/*' % utils.sh_escape(results_path),
151                 ignore_status=True)
152
153        # Fetch the autotest client from the nearest repository
154        if use_packaging:
155            try:
156                self._install_using_packaging(host, autodir)
157                return
158            except global_config.ConfigError, e:
159                logging.info("Could not install autotest using the packaging "
160                             "system: %s",  e)
161            except (error.PackageInstallError, error.AutoservRunError), e:
162                logging.error("Could not install autotest from repos")
163
164        # try to install from file or directory
165        if self.source_material:
166            if os.path.isdir(self.source_material):
167                c = global_config.global_config
168                supports_autoserv_packaging = c.get_config_value(
169                    "PACKAGES", "serve_packages_from_autoserv", type=bool)
170                # Copy autotest recursively
171                if supports_autoserv_packaging and use_autoserv:
172                    self._install_using_send_file(host, autodir)
173                else:
174                    host.send_file(self.source_material, autodir,
175                                   delete_dest=True)
176            else:
177                # Copy autotest via tarball
178                e_msg = 'Installation method not yet implemented!'
179                raise NotImplementedError(e_msg)
180            logging.info("Installation of autotest completed")
181            self.installed = True
182            return
183
184        # if that fails try to install using svn
185        if utils.run('which svn').exit_status:
186            raise error.AutoservError('svn not found on target machine: %s'
187                                                                   % host.name)
188        try:
189            host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir))
190        except error.AutoservRunError, e:
191            host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir))
192        logging.info("Installation of autotest completed")
193        self.installed = True
194
195
196    def uninstall(self, host=None):
197        """
198        Uninstall (i.e. delete) autotest. Removes the autotest client install
199        from the specified host.
200
201        @params host a Host instance from which the client will be removed
202        """
203        if not self.installed:
204            return
205        if not host:
206            host = self.host
207        autodir = host.get_autodir()
208        if not autodir:
209            return
210
211        # perform the actual uninstall
212        host.run("rm -rf %s" % utils.sh_escape(autodir), ignore_status=True)
213        host.set_autodir(None)
214        self.installed = False
215
216
217    def get(self, location = None):
218        if not location:
219            location = os.path.join(self.serverdir, '../client')
220            location = os.path.abspath(location)
221        # If there's stuff run on our client directory already, it
222        # can cause problems. Try giving it a quick clean first.
223        cwd = os.getcwd()
224        os.chdir(location)
225        os.system('tools/make_clean')
226        os.chdir(cwd)
227        super(BaseAutotest, self).get(location)
228        self.got = True
229
230
231    def run(self, control_file, results_dir='.', host=None, timeout=None,
232            tag=None, parallel_flag=False, background=False,
233            client_disconnect_timeout=1800, job_tag=''):
234        """
235        Run an autotest job on the remote machine.
236
237        @param control_file: An open file-like-obj of the control file.
238        @param results_dir: A str path where the results should be stored
239                on the local filesystem.
240        @param host: A Host instance on which the control file should
241                be run.
242        @param timeout: Maximum number of seconds to wait for the run or None.
243        @param tag: Tag name for the client side instance of autotest.
244        @param parallel_flag: Flag set when multiple jobs are run at the
245                same time.
246        @param background: Indicates that the client should be launched as
247                a background job; the code calling run will be responsible
248                for monitoring the client and collecting the results.
249        @param client_disconnect_timeout: Seconds to wait for the remote host
250                to come back after a reboot.  [default: 30 minutes]
251        @param job_tag: The scheduler's execution tag for this particular job
252                to pass on to the clients.  'job#-owner/hostgroupname'
253
254        @raises AutotestRunError: If there is a problem executing
255                the control file.
256        """
257        host = self._get_host_and_setup(host)
258        results_dir = os.path.abspath(results_dir)
259
260        if tag:
261            results_dir = os.path.join(results_dir, tag)
262
263        atrun = _Run(host, results_dir, tag, parallel_flag, background)
264        self._do_run(control_file, results_dir, host, atrun, timeout,
265                     client_disconnect_timeout, job_tag)
266
267
268    def _get_host_and_setup(self, host):
269        if not host:
270            host = self.host
271        if not self.installed:
272            self.install(host)
273
274        host.wait_up(timeout=30)
275        return host
276
277
278    def _do_run(self, control_file, results_dir, host, atrun, timeout,
279                client_disconnect_timeout, job_tag):
280        try:
281            atrun.verify_machine()
282        except:
283            logging.error("Verify failed on %s. Reinstalling autotest",
284                          host.hostname)
285            self.install(host)
286        atrun.verify_machine()
287        debug = os.path.join(results_dir, 'debug')
288        try:
289            os.makedirs(debug)
290        except Exception:
291            pass
292
293        delete_file_list = [atrun.remote_control_file,
294                            atrun.remote_control_file + '.state',
295                            atrun.manual_control_file,
296                            atrun.manual_control_file + '.state']
297        cmd = ';'.join('rm -f ' + control for control in delete_file_list)
298        host.run(cmd, ignore_status=True)
299
300        tmppath = utils.get(control_file)
301
302        # build up the initialization prologue for the control file
303        prologue_lines = []
304        prologue_lines.append("job.set_default_profile_only(%r)\n"
305                              % host.job.default_profile_only)
306        prologue_lines.append("job.default_boot_tag(%r)\n"
307                              % host.job.last_boot_tag)
308        prologue_lines.append("job.default_test_cleanup(%r)\n"
309                              % host.job.run_test_cleanup)
310        if job_tag:
311            prologue_lines.append("job.default_tag(%r)\n" % job_tag)
312
313        # If the packaging system is being used, add the repository list.
314        try:
315            c = global_config.global_config
316            repos = c.get_config_value("PACKAGES", 'fetch_location', type=list)
317            repos.reverse()  # high priority packages should be added last
318            pkgmgr = packages.PackageManager('autotest', hostname=host.hostname,
319                                             repo_urls=repos)
320            prologue_lines.append('job.add_repository(%s)\n' % repos)
321        except global_config.ConfigError, e:
322            pass
323
324        # on full-size installs, turn on any profilers the server is using
325        if not atrun.background:
326            running_profilers = host.job.profilers.add_log.iteritems()
327            for profiler, (args, dargs) in running_profilers:
328                call_args = [repr(profiler)]
329                call_args += [repr(arg) for arg in args]
330                call_args += ["%s=%r" % item for item in dargs.iteritems()]
331                prologue_lines.append("job.profilers.add(%s)\n"
332                                      % ", ".join(call_args))
333        cfile = "".join(prologue_lines)
334
335        cfile += open(tmppath).read()
336        open(tmppath, "w").write(cfile)
337
338        # Create and copy state file to remote_control_file + '.state'
339        sysinfo_state = {"__sysinfo": host.job.sysinfo.serialize()}
340        state_file = self._create_state_file(host.job, sysinfo_state)
341        host.send_file(state_file, atrun.remote_control_file + '.state')
342        os.remove(state_file)
343
344        # Copy control_file to remote_control_file on the host
345        host.send_file(tmppath, atrun.remote_control_file)
346        if os.path.abspath(tmppath) != os.path.abspath(control_file):
347            os.remove(tmppath)
348
349        atrun.execute_control(
350                timeout=timeout,
351                client_disconnect_timeout=client_disconnect_timeout)
352
353
354    def _create_state_file(self, job, state_dict):
355        """ Create a state file from a dictionary. Returns the path of the
356        state file. """
357        fd, path = tempfile.mkstemp(dir=job.tmpdir)
358        state_file = os.fdopen(fd, "w")
359        pickle.dump(state_dict, state_file)
360        state_file.close()
361        return path
362
363
364    def run_timed_test(self, test_name, results_dir='.', host=None,
365                       timeout=None, *args, **dargs):
366        """
367        Assemble a tiny little control file to just run one test,
368        and run it as an autotest client-side test
369        """
370        if not host:
371            host = self.host
372        if not self.installed:
373            self.install(host)
374        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]
375        cmd = ", ".join([repr(test_name)] + map(repr, args) + opts)
376        control = "job.run_test(%s)\n" % cmd
377        self.run(control, results_dir, host, timeout=timeout)
378
379
380    def run_test(self, test_name, results_dir='.', host=None, *args, **dargs):
381        self.run_timed_test(test_name, results_dir, host, timeout=None,
382                            *args, **dargs)
383
384
385class _Run(object):
386    """
387    Represents a run of autotest control file.  This class maintains
388    all the state necessary as an autotest control file is executed.
389
390    It is not intended to be used directly, rather control files
391    should be run using the run method in Autotest.
392    """
393    def __init__(self, host, results_dir, tag, parallel_flag, background):
394        self.host = host
395        self.results_dir = results_dir
396        self.env = host.env
397        self.tag = tag
398        self.parallel_flag = parallel_flag
399        self.background = background
400        self.autodir = _get_autodir(self.host)
401        control = os.path.join(self.autodir, 'control')
402        if tag:
403            control += '.' + tag
404        self.manual_control_file = control
405        self.remote_control_file = control + '.autoserv'
406
407
408    def verify_machine(self):
409        binary = os.path.join(self.autodir, 'bin/autotest')
410        try:
411            self.host.run('ls %s > /dev/null 2>&1' % binary)
412        except:
413            raise "Autotest does not appear to be installed"
414
415        if not self.parallel_flag:
416            tmpdir = os.path.join(self.autodir, 'tmp')
417            download = os.path.join(self.autodir, 'tests/download')
418            self.host.run('umount %s' % tmpdir, ignore_status=True)
419            self.host.run('umount %s' % download, ignore_status=True)
420
421
422    def get_base_cmd_args(self, section):
423        args = ['--verbose']
424        if section > 0:
425            args.append('-c')
426        if self.tag:
427            args.append('-t %s' % self.tag)
428        if self.host.job.use_external_logging():
429            args.append('-l')
430        if self.host.hostname:
431            args.append('--hostname=%s' % self.host.hostname)
432        args.append('--user=%s' % getpass.getuser())
433
434        args.append(self.remote_control_file)
435        return args
436
437
438    def get_background_cmd(self, section):
439        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotest_client')]
440        cmd += self.get_base_cmd_args(section)
441        cmd.append('>/dev/null 2>/dev/null &')
442        return ' '.join(cmd)
443
444
445    def get_daemon_cmd(self, section, monitor_dir):
446        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotestd'),
447               monitor_dir, '-H autoserv']
448        cmd += self.get_base_cmd_args(section)
449        cmd.append('>/dev/null 2>/dev/null </dev/null &')
450        return ' '.join(cmd)
451
452
453    def get_monitor_cmd(self, monitor_dir, stdout_read, stderr_read):
454        cmd = [os.path.join(self.autodir, 'bin', 'autotestd_monitor'),
455               monitor_dir, str(stdout_read), str(stderr_read)]
456        return ' '.join(cmd)
457
458
459    def get_client_log(self, section):
460        """ Find what the "next" client.log.* file should be and open it. """
461        debug_dir = os.path.join(self.results_dir, "debug")
462        client_logs = glob.glob(os.path.join(debug_dir, "client.log.*"))
463        next_log = os.path.join(debug_dir, "client.log.%d" % len(client_logs))
464        return open(next_log, "w", 0)
465
466
467    @staticmethod
468    def is_client_job_finished(last_line):
469        return bool(re.match(r'^END .*\t----\t----\t.*$', last_line))
470
471
472    @staticmethod
473    def is_client_job_rebooting(last_line):
474        return bool(re.match(r'^\t*GOOD\t----\treboot\.start.*$', last_line))
475
476
477    def log_unexpected_abort(self, stderr_redirector):
478        stderr_redirector.flush_all_buffers()
479        msg = "Autotest client terminated unexpectedly"
480        self.host.job.record("END ABORT", None, None, msg)
481
482
483    def _execute_in_background(self, section, timeout):
484        full_cmd = self.get_background_cmd(section)
485        devnull = open(os.devnull, "w")
486
487        old_resultdir = self.host.job.resultdir
488        try:
489            self.host.job.resultdir = self.results_dir
490            result = self.host.run(full_cmd, ignore_status=True,
491                                   timeout=timeout,
492                                   stdout_tee=devnull,
493                                   stderr_tee=devnull)
494        finally:
495            self.host.job.resultdir = old_resultdir
496
497        return result
498
499
500    @staticmethod
501    def _strip_stderr_prologue(stderr):
502        """Strips the 'standard' prologue that get pre-pended to every
503        remote command and returns the text that was actually written to
504        stderr by the remote command."""
505        stderr_lines = stderr.split("\n")[1:]
506        if not stderr_lines:
507            return ""
508        elif stderr_lines[0].startswith("NOTE: autotestd_monitor"):
509            del stderr_lines[0]
510        return "\n".join(stderr_lines)
511
512
513    def _execute_daemon(self, section, timeout, stderr_redirector,
514                        client_disconnect_timeout):
515        monitor_dir = self.host.get_tmp_dir()
516        daemon_cmd = self.get_daemon_cmd(section, monitor_dir)
517        client_log = self.get_client_log(section)
518
519        stdout_read = stderr_read = 0
520        old_resultdir = self.host.job.resultdir
521        try:
522            self.host.job.resultdir = self.results_dir
523            self.host.run(daemon_cmd, ignore_status=True, timeout=timeout)
524            disconnect_warnings = []
525            while True:
526                monitor_cmd = self.get_monitor_cmd(monitor_dir, stdout_read,
527                                                   stderr_read)
528                try:
529                    result = self.host.run(monitor_cmd, ignore_status=True,
530                                           timeout=timeout,
531                                           stdout_tee=client_log,
532                                           stderr_tee=stderr_redirector)
533                except error.AutoservRunError, e:
534                    result = e.result_obj
535                    result.exit_status = None
536                    disconnect_warnings.append(e.description)
537
538                    stderr_redirector.log_warning(
539                        "Autotest client was disconnected: %s" % e.description,
540                        "NETWORK")
541                except error.AutoservSSHTimeout:
542                    result = utils.CmdResult(monitor_cmd, "", "", None, 0)
543                    stderr_redirector.log_warning(
544                        "Attempt to connect to Autotest client timed out",
545                        "NETWORK")
546
547                stdout_read += len(result.stdout)
548                stderr_read += len(self._strip_stderr_prologue(result.stderr))
549
550                if result.exit_status is not None:
551                    return result
552                elif not self.host.wait_up(client_disconnect_timeout):
553                    raise error.AutoservSSHTimeout(
554                        "client was disconnected, reconnect timed out")
555        finally:
556            self.host.job.resultdir = old_resultdir
557
558
559    def execute_section(self, section, timeout, stderr_redirector,
560                        client_disconnect_timeout):
561        logging.info("Executing %s/bin/autotest %s/control phase %d",
562                     self.autodir, self.autodir, section)
563
564        if self.background:
565            result = self._execute_in_background(section, timeout)
566        else:
567            result = self._execute_daemon(section, timeout, stderr_redirector,
568                                          client_disconnect_timeout)
569
570        last_line = stderr_redirector.last_line
571
572        # check if we failed hard enough to warrant an exception
573        if result.exit_status == 1:
574            err = error.AutotestRunError("client job was aborted")
575        elif not self.background and not result.stderr:
576            err = error.AutotestRunError(
577                "execute_section %s failed to return anything\n"
578                "stdout:%s\n" % (section, result.stdout))
579        else:
580            err = None
581
582        # log something if the client failed AND never finished logging
583        if err and not self.is_client_job_finished(last_line):
584            self.log_unexpected_abort(stderr_redirector)
585
586        if err:
587            raise err
588        else:
589            return stderr_redirector.last_line
590
591
592    def _wait_for_reboot(self):
593        logging.info("Client is rebooting")
594        logging.info("Waiting for client to halt")
595        if not self.host.wait_down(HALT_TIME):
596            err = "%s failed to shutdown after %d"
597            err %= (self.host.hostname, HALT_TIME)
598            raise error.AutotestRunError(err)
599        logging.info("Client down, waiting for restart")
600        if not self.host.wait_up(BOOT_TIME):
601            # since reboot failed
602            # hardreset the machine once if possible
603            # before failing this control file
604            warning = "%s did not come back up, hard resetting"
605            warning %= self.host.hostname
606            logging.warning(warning)
607            try:
608                self.host.hardreset(wait=False)
609            except (AttributeError, error.AutoservUnsupportedError):
610                warning = "Hard reset unsupported on %s"
611                warning %= self.host.hostname
612                logging.warning(warning)
613            raise error.AutotestRunError("%s failed to boot after %ds" %
614                                         (self.host.hostname, BOOT_TIME))
615        self.host.reboot_followup()
616
617
618    def _process_client_state_file(self):
619        state_file = os.path.basename(self.remote_control_file) + ".state"
620        state_path = os.path.join(self.results_dir, state_file)
621        try:
622            state_dict = pickle.load(open(state_path))
623        except Exception, e:
624            msg = "Ignoring error while loading client job state file: %s" % e
625            logging.warning(msg)
626            state_dict = {}
627
628        # clear out the state file
629        # TODO: stash the file away somewhere useful instead
630        try:
631            os.remove(state_path)
632        except Exception:
633            pass
634
635        logging.debug("Persistent state variables pulled back from %s: %s",
636                      self.host.hostname, state_dict)
637
638        if "__default_profile_only" in state_dict:
639            self.host.job.default_profile_only = (
640                state_dict["__default_profile_only"])
641
642        if "__run_test_cleanup" in state_dict:
643            if state_dict["__run_test_cleanup"]:
644                self.host.job.enable_test_cleanup()
645            else:
646                self.host.job.disable_test_cleanup()
647
648        if "__last_boot_tag" in state_dict:
649            self.host.job.last_boot_tag = state_dict["__last_boot_tag"]
650
651        if "__sysinfo" in state_dict:
652            self.host.job.sysinfo.deserialize(state_dict["__sysinfo"])
653
654
655    def execute_control(self, timeout=None, client_disconnect_timeout=None):
656        if not self.background:
657            collector = log_collector(self.host, self.tag, self.results_dir)
658            hostname = self.host.hostname
659            remote_results = collector.client_results_dir
660            local_results = collector.server_results_dir
661            self.host.job.add_client_log(hostname, remote_results,
662                                         local_results)
663
664        section = 0
665        start_time = time.time()
666
667        logger = client_logger(self.host, self.tag, self.results_dir)
668        try:
669            while not timeout or time.time() < start_time + timeout:
670                if timeout:
671                    section_timeout = start_time + timeout - time.time()
672                else:
673                    section_timeout = None
674                last = self.execute_section(section, section_timeout,
675                                            logger, client_disconnect_timeout)
676                if self.background:
677                    return
678                section += 1
679                if self.is_client_job_finished(last):
680                    logging.info("Client complete")
681                    return
682                elif self.is_client_job_rebooting(last):
683                    try:
684                        self._wait_for_reboot()
685                    except error.AutotestRunError, e:
686                        self.host.job.record("ABORT", None, "reboot", str(e))
687                        self.host.job.record("END ABORT", None, None, str(e))
688                        raise
689                    continue
690
691                # if we reach here, something unexpected happened
692                self.log_unexpected_abort(logger)
693
694                # give the client machine a chance to recover from a crash
695                self.host.wait_up(CRASH_RECOVERY_TIME)
696                msg = ("Aborting - unexpected final status message from "
697                       "client: %s\n") % last
698                raise error.AutotestRunError(msg)
699        finally:
700            logger.close()
701            if not self.background:
702                collector.collect_client_job_results()
703                self._process_client_state_file()
704                self.host.job.remove_client_log(hostname, remote_results,
705                                                local_results)
706
707        # should only get here if we timed out
708        assert timeout
709        raise error.AutotestTimeoutError()
710
711
712def _get_autodir(host):
713    autodir = host.get_autodir()
714    if autodir:
715        logging.debug('Using existing host autodir: %s', autodir)
716        return autodir
717    client_autodir_paths = global_config.global_config.get_config_value(
718            'AUTOSERV', 'client_autodir_paths', type=list)
719
720    for path in client_autodir_paths:
721        try:
722            autotest_binary = os.path.join(path, 'bin', 'autotest')
723            host.run('test -x %s' % utils.sh_escape(autotest_binary))
724            logging.debug('Found autodir at %s', path)
725            return path
726        except error.AutoservRunError:
727            logging.debug('%s does not exist on %s', path, host.hostname)
728    raise error.AutotestRunError('Cannot figure out autotest directory')
729
730
731class log_collector(object):
732    def __init__(self, host, client_tag, results_dir):
733        self.host = host
734        if not client_tag:
735            client_tag = "default"
736        self.client_results_dir = os.path.join(host.get_autodir(), "results",
737                                               client_tag)
738        self.server_results_dir = results_dir
739
740
741    def collect_client_job_results(self):
742        """ A method that collects all the current results of a running
743        client job into the results dir. By default does nothing as no
744        client job is running, but when running a client job you can override
745        this with something that will actually do something. """
746
747        # make an effort to wait for the machine to come up
748        try:
749            self.host.wait_up(timeout=30)
750        except error.AutoservError:
751            # don't worry about any errors, we'll try and
752            # get the results anyway
753            pass
754
755        # Copy all dirs in default to results_dir
756        try:
757            self.host.get_file(self.client_results_dir + '/',
758                               self.server_results_dir, preserve_symlinks=True)
759        except Exception:
760            # well, don't stop running just because we couldn't get logs
761            e_msg = "Unexpected error copying test result logs, continuing ..."
762            logging.error(e_msg)
763            traceback.print_exc(file=sys.stdout)
764
765
766# a file-like object for catching stderr from an autotest client and
767# extracting status logs from it
768class client_logger(object):
769    """Partial file object to write to both stdout and
770    the status log file.  We only implement those methods
771    utils.run() actually calls.
772
773    Note that this class is fairly closely coupled with server_job, as it
774    uses special job._ methods to actually carry out the loggging.
775    """
776    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
777    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
778    fetch_package_parser = re.compile(
779        r"^AUTOTEST_FETCH_PACKAGE:([^:]*):([^:]*):(.*)$")
780    extract_indent = re.compile(r"^(\t*).*$")
781    extract_timestamp = re.compile(r".*\ttimestamp=(\d+)\t.*$")
782
783    def __init__(self, host, tag, server_results_dir):
784        self.host = host
785        self.job = host.job
786        self.log_collector = log_collector(host, tag, server_results_dir)
787        self.leftover = ""
788        self.last_line = ""
789        self.newest_timestamp = float("-inf")
790        self.logs = {}
791        self.server_warnings = []
792
793
794    def _update_timestamp(self, line):
795        match = self.extract_timestamp.search(line)
796        if match:
797            self.newest_timestamp = max(self.newest_timestamp,
798                                        int(match.group(1)))
799
800
801    def _process_log_dict(self, log_dict):
802        log_list = log_dict.pop("logs", [])
803        for key in sorted(log_dict.iterkeys()):
804            log_list += self._process_log_dict(log_dict.pop(key))
805        return log_list
806
807
808    def _process_logs(self):
809        """Go through the accumulated logs in self.log and print them
810        out to stdout and the status log. Note that this processes
811        logs in an ordering where:
812
813        1) logs to different tags are never interleaved
814        2) logs to x.y come before logs to x.y.z for all z
815        3) logs to x.y come before x.z whenever y < z
816
817        Note that this will in general not be the same as the
818        chronological ordering of the logs. However, if a chronological
819        ordering is desired that one can be reconstructed from the
820        status log by looking at timestamp lines."""
821        log_list = self._process_log_dict(self.logs)
822        for line in log_list:
823            self.job._record_prerendered(line + '\n')
824        if log_list:
825            self.last_line = log_list[-1]
826
827
828    def _process_quoted_line(self, tag, line):
829        """Process a line quoted with an AUTOTEST_STATUS flag. If the
830        tag is blank then we want to push out all the data we've been
831        building up in self.logs, and then the newest line. If the
832        tag is not blank, then push the line into the logs for handling
833        later."""
834        logging.info(line)
835        if tag == "":
836            self._process_logs()
837            self.job._record_prerendered(line + '\n')
838            self.last_line = line
839        else:
840            tag_parts = [int(x) for x in tag.split(".")]
841            log_dict = self.logs
842            for part in tag_parts:
843                log_dict = log_dict.setdefault(part, {})
844            log_list = log_dict.setdefault("logs", [])
845            log_list.append(line)
846
847
848    def _process_info_line(self, line):
849        """Check if line is an INFO line, and if it is, interpret any control
850        messages (e.g. enabling/disabling warnings) that it may contain."""
851        match = re.search(r"^\t*INFO\t----\t----(.*)\t[^\t]*$", line)
852        if not match:
853            return   # not an INFO line
854        for field in match.group(1).split('\t'):
855            if field.startswith("warnings.enable="):
856                func = self.job.warning_manager.enable_warnings
857            elif field.startswith("warnings.disable="):
858                func = self.job.warning_manager.disable_warnings
859            else:
860                continue
861            warning_type = field.split("=", 1)[1]
862            func(warning_type)
863
864
865    def _process_line(self, line):
866        """Write out a line of data to the appropriate stream. Status
867        lines sent by autotest will be prepended with
868        "AUTOTEST_STATUS", and all other lines are ssh error
869        messages."""
870        status_match = self.status_parser.search(line)
871        test_complete_match = self.test_complete_parser.search(line)
872        fetch_package_match = self.fetch_package_parser.search(line)
873        if status_match:
874            tag, line = status_match.groups()
875            self._process_info_line(line)
876            self._process_quoted_line(tag, line)
877        elif test_complete_match:
878            self._process_logs()
879            fifo_path, = test_complete_match.groups()
880            self.log_collector.collect_client_job_results()
881            self.host.run("echo A > %s" % fifo_path)
882        elif fetch_package_match:
883            pkg_name, dest_path, fifo_path = fetch_package_match.groups()
884            serve_packages = global_config.global_config.get_config_value(
885                "PACKAGES", "serve_packages_from_autoserv", type=bool)
886            if serve_packages and pkg_name.endswith(".tar.bz2"):
887                try:
888                    self._send_tarball(pkg_name, dest_path)
889                except Exception:
890                    msg = "Package tarball creation failed, continuing anyway"
891                    logging.exception(msg)
892            self.host.run("echo B > %s" % fifo_path)
893        else:
894            logging.info(line)
895
896
897    def _send_tarball(self, pkg_name, remote_dest):
898        name, pkg_type = self.job.pkgmgr.parse_tarball_name(pkg_name)
899        src_dirs = []
900        if pkg_type == 'test':
901            src_dirs += [os.path.join(self.job.clientdir, 'site_tests', name),
902                         os.path.join(self.job.clientdir, 'tests', name)]
903        elif pkg_type == 'profiler':
904            src_dirs += [os.path.join(self.job.clientdir, 'profilers', name)]
905        elif pkg_type == 'dep':
906            src_dirs += [os.path.join(self.job.clientdir, 'deps', name)]
907        elif pkg_type == 'client':
908            return  # you must already have a client to hit this anyway
909        else:
910            return  # no other types are supported
911
912        # iterate over src_dirs until we find one that exists, then tar it
913        for src_dir in src_dirs:
914            if os.path.exists(src_dir):
915                try:
916                    logging.info('Bundling %s into %s', src_dir, pkg_name)
917                    temp_dir = autotemp.tempdir(unique_id='autoserv-packager',
918                                                dir=self.job.tmpdir)
919                    tarball_path = self.job.pkgmgr.tar_package(
920                        pkg_name, src_dir, temp_dir.name, " .")
921                    self.host.send_file(tarball_path, remote_dest)
922                finally:
923                    temp_dir.clean()
924                return
925
926
927    def _format_warnings(self, last_line, warnings):
928        # use the indentation of whatever the last log line was
929        indent = self.extract_indent.match(last_line).group(1)
930        # if the last line starts a new group, add an extra indent
931        if last_line.lstrip('\t').startswith("START\t"):
932            indent += '\t'
933        return [self.job._render_record("WARN", None, None, msg,
934                                        timestamp, indent).rstrip('\n')
935                for timestamp, msg in warnings]
936
937
938    def _process_warnings(self, last_line, log_dict, warnings):
939        if log_dict.keys() in ([], ["logs"]):
940            # there are no sub-jobs, just append the warnings here
941            warnings = self._format_warnings(last_line, warnings)
942            log_list = log_dict.setdefault("logs", [])
943            log_list += warnings
944            for warning in warnings:
945                sys.stdout.write(warning + '\n')
946        else:
947            # there are sub-jobs, so put the warnings in there
948            log_list = log_dict.get("logs", [])
949            if log_list:
950                last_line = log_list[-1]
951            for key in sorted(log_dict.iterkeys()):
952                if key != "logs":
953                    self._process_warnings(last_line,
954                                           log_dict[key],
955                                           warnings)
956
957
958    def log_warning(self, msg, warning_type):
959        """Injects a WARN message into the current status logging stream."""
960        timestamp = int(time.time())
961        if self.job.warning_manager.is_valid(timestamp, warning_type):
962            self.server_warnings.append((timestamp, msg))
963
964
965    def write(self, data):
966        # first check for any new console warnings
967        warnings = self.job._read_warnings() + self.server_warnings
968        warnings.sort()  # sort into timestamp order
969        # now process the newest data written out
970        data = self.leftover + data
971        lines = data.split("\n")
972        # process every line but the last one
973        for line in lines[:-1]:
974            self._update_timestamp(line)
975            # output any warnings between now and the next status line
976            old_warnings = [(timestamp, msg) for timestamp, msg in warnings
977                            if timestamp < self.newest_timestamp]
978            self._process_warnings(self.last_line, self.logs, warnings)
979            del warnings[:len(old_warnings)]
980            self._process_line(line)
981        # save off any warnings not yet logged for later processing
982        self.server_warnings = warnings
983        # save the last line for later processing
984        # since we may not have the whole line yet
985        self.leftover = lines[-1]
986
987
988    def flush(self):
989        sys.stdout.flush()
990
991
992    def flush_all_buffers(self):
993        if self.leftover:
994            self._process_line(self.leftover)
995            self.leftover = ""
996        self._process_warnings(self.last_line, self.logs, self.server_warnings)
997        self._process_logs()
998        self.flush()
999
1000
1001    def close(self):
1002        self.flush_all_buffers()
1003
1004
1005SiteAutotest = client_utils.import_site_class(
1006    __file__, "autotest_lib.server.site_autotest", "SiteAutotest",
1007    BaseAutotest)
1008
1009class Autotest(SiteAutotest):
1010    pass
1011