autotest.py revision bd450640e5be8e64d244b65488fbfb725056b7f9
1# Copyright 2007 Google Inc. Released under the GPL v2 2 3import re, os, sys, traceback, subprocess, tempfile, time, pickle, glob, logging 4import getpass 5from autotest_lib.server import installable_object, utils 6from autotest_lib.client.common_lib import log, error, autotemp 7from autotest_lib.client.common_lib import global_config, packages 8from autotest_lib.client.common_lib import utils as client_utils 9 10AUTOTEST_SVN = 'svn://test.kernel.org/autotest/trunk/client' 11AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client' 12 13# Timeouts for powering down and up respectively 14HALT_TIME = 300 15BOOT_TIME = 1800 16CRASH_RECOVERY_TIME = 9000 17 18 19class BaseAutotest(installable_object.InstallableObject): 20 """ 21 This class represents the Autotest program. 22 23 Autotest is used to run tests automatically and collect the results. 24 It also supports profilers. 25 26 Implementation details: 27 This is a leaf class in an abstract class hierarchy, it must 28 implement the unimplemented methods in parent classes. 29 """ 30 31 def __init__(self, host = None): 32 self.host = host 33 self.got = False 34 self.installed = False 35 self.serverdir = utils.get_server_dir() 36 super(BaseAutotest, self).__init__() 37 38 39 install_in_tmpdir = False 40 @classmethod 41 def set_install_in_tmpdir(cls, flag): 42 """ Sets a flag that controls whether or not Autotest should by 43 default be installed in a "standard" directory (e.g. 44 /home/autotest, /usr/local/autotest) or a temporary directory. """ 45 cls.install_in_tmpdir = flag 46 47 48 def _get_install_dir(self, host): 49 """ Determines the location where autotest should be installed on 50 host. If self.install_in_tmpdir is set, it will return a unique 51 temporary directory that autotest can be installed in. """ 52 try: 53 autodir = _get_autodir(host) 54 except error.AutotestRunError: 55 autodir = '/usr/local/autotest' 56 if self.install_in_tmpdir: 57 autodir = host.get_tmp_dir(parent=autodir) 58 return autodir 59 60 61 @log.record 62 def install(self, host=None, autodir=None): 63 self._install(host=host, autodir=autodir) 64 65 66 @log.record 67 def install_full_client(self, host=None, autodir=None): 68 self._install(host=host, autodir=autodir, use_autoserv=False, 69 use_packaging=False) 70 71 72 def install_no_autoserv(self, host=None, autodir=None): 73 self._install(host=host, autodir=autodir, use_autoserv=False) 74 75 76 def _install_using_packaging(self, host, autodir): 77 c = global_config.global_config 78 repos = c.get_config_value("PACKAGES", 'fetch_location', type=list, 79 default=[]) 80 repos.reverse() 81 if not repos: 82 raise error.PackageInstallError("No repos to install an " 83 "autotest client from") 84 pkgmgr = packages.PackageManager(autodir, hostname=host.hostname, 85 repo_urls=repos, 86 do_locking=False, 87 run_function=host.run, 88 run_function_dargs=dict(timeout=600)) 89 # The packages dir is used to store all the packages that 90 # are fetched on that client. (for the tests,deps etc. 91 # too apart from the client) 92 pkg_dir = os.path.join(autodir, 'packages') 93 # clean up the autodir except for the packages directory 94 host.run('cd %s && ls | grep -v "^packages$"' 95 ' | xargs rm -rf && rm -rf .[^.]*' % autodir) 96 pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir, 97 preserve_install_dir=True) 98 self.installed = True 99 100 101 def _install_using_send_file(self, host, autodir): 102 dirs_to_exclude = set(["tests", "site_tests", "deps", "profilers"]) 103 light_files = [os.path.join(self.source_material, f) 104 for f in os.listdir(self.source_material) 105 if f not in dirs_to_exclude] 106 host.send_file(light_files, autodir, delete_dest=True) 107 108 # create empty dirs for all the stuff we excluded 109 commands = [] 110 for path in dirs_to_exclude: 111 abs_path = os.path.join(autodir, path) 112 abs_path = utils.sh_escape(abs_path) 113 commands.append("mkdir -p '%s'" % abs_path) 114 commands.append("touch '%s'/__init__.py" % abs_path) 115 host.run(';'.join(commands)) 116 117 118 def _install(self, host=None, autodir=None, use_autoserv=True, 119 use_packaging=True): 120 """ 121 Install autotest. If get() was not called previously, an 122 attempt will be made to install from the autotest svn 123 repository. 124 125 @param host A Host instance on which autotest will be installed 126 @param autodir Location on the remote host to install to 127 @param use_autoserv Enable install modes that depend on the client 128 running with the autoserv harness 129 @param use_packaging Enable install modes that use the packaging system 130 131 @exception AutoservError if a tarball was not specified and 132 the target host does not have svn installed in its path 133 """ 134 if not host: 135 host = self.host 136 if not self.got: 137 self.get() 138 host.wait_up(timeout=30) 139 host.setup() 140 logging.info("Installing autotest on %s", host.hostname) 141 142 # set up the autotest directory on the remote machine 143 if not autodir: 144 autodir = self._get_install_dir(host) 145 host.set_autodir(autodir) 146 host.run('mkdir -p %s' % utils.sh_escape(autodir)) 147 148 # make sure there are no files in $AUTODIR/results 149 results_path = os.path.join(autodir, 'results') 150 host.run('rm -rf %s/*' % utils.sh_escape(results_path), 151 ignore_status=True) 152 153 # Fetch the autotest client from the nearest repository 154 if use_packaging: 155 try: 156 self._install_using_packaging(host, autodir) 157 return 158 except global_config.ConfigError, e: 159 logging.info("Could not install autotest using the packaging " 160 "system: %s", e) 161 except (error.PackageInstallError, error.AutoservRunError), e: 162 logging.error("Could not install autotest from repos") 163 164 # try to install from file or directory 165 if self.source_material: 166 if os.path.isdir(self.source_material): 167 c = global_config.global_config 168 supports_autoserv_packaging = c.get_config_value( 169 "PACKAGES", "serve_packages_from_autoserv", type=bool) 170 # Copy autotest recursively 171 if supports_autoserv_packaging and use_autoserv: 172 self._install_using_send_file(host, autodir) 173 else: 174 host.send_file(self.source_material, autodir, 175 delete_dest=True) 176 else: 177 # Copy autotest via tarball 178 e_msg = 'Installation method not yet implemented!' 179 raise NotImplementedError(e_msg) 180 logging.info("Installation of autotest completed") 181 self.installed = True 182 return 183 184 # if that fails try to install using svn 185 if utils.run('which svn').exit_status: 186 raise error.AutoservError('svn not found on target machine: %s' 187 % host.name) 188 try: 189 host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir)) 190 except error.AutoservRunError, e: 191 host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir)) 192 logging.info("Installation of autotest completed") 193 self.installed = True 194 195 196 def uninstall(self, host=None): 197 """ 198 Uninstall (i.e. delete) autotest. Removes the autotest client install 199 from the specified host. 200 201 @params host a Host instance from which the client will be removed 202 """ 203 if not self.installed: 204 return 205 if not host: 206 host = self.host 207 autodir = host.get_autodir() 208 if not autodir: 209 return 210 211 # perform the actual uninstall 212 host.run("rm -rf %s" % utils.sh_escape(autodir), ignore_status=True) 213 host.set_autodir(None) 214 self.installed = False 215 216 217 def get(self, location = None): 218 if not location: 219 location = os.path.join(self.serverdir, '../client') 220 location = os.path.abspath(location) 221 # If there's stuff run on our client directory already, it 222 # can cause problems. Try giving it a quick clean first. 223 cwd = os.getcwd() 224 os.chdir(location) 225 os.system('tools/make_clean') 226 os.chdir(cwd) 227 super(BaseAutotest, self).get(location) 228 self.got = True 229 230 231 def run(self, control_file, results_dir='.', host=None, timeout=None, 232 tag=None, parallel_flag=False, background=False, 233 client_disconnect_timeout=1800, job_tag=''): 234 """ 235 Run an autotest job on the remote machine. 236 237 @param control_file: An open file-like-obj of the control file. 238 @param results_dir: A str path where the results should be stored 239 on the local filesystem. 240 @param host: A Host instance on which the control file should 241 be run. 242 @param timeout: Maximum number of seconds to wait for the run or None. 243 @param tag: Tag name for the client side instance of autotest. 244 @param parallel_flag: Flag set when multiple jobs are run at the 245 same time. 246 @param background: Indicates that the client should be launched as 247 a background job; the code calling run will be responsible 248 for monitoring the client and collecting the results. 249 @param client_disconnect_timeout: Seconds to wait for the remote host 250 to come back after a reboot. [default: 30 minutes] 251 @param job_tag: The scheduler's execution tag for this particular job 252 to pass on to the clients. 'job#-owner/hostgroupname' 253 254 @raises AutotestRunError: If there is a problem executing 255 the control file. 256 """ 257 host = self._get_host_and_setup(host) 258 results_dir = os.path.abspath(results_dir) 259 260 if tag: 261 results_dir = os.path.join(results_dir, tag) 262 263 atrun = _Run(host, results_dir, tag, parallel_flag, background) 264 self._do_run(control_file, results_dir, host, atrun, timeout, 265 client_disconnect_timeout, job_tag) 266 267 268 def _get_host_and_setup(self, host): 269 if not host: 270 host = self.host 271 if not self.installed: 272 self.install(host) 273 274 host.wait_up(timeout=30) 275 return host 276 277 278 def _do_run(self, control_file, results_dir, host, atrun, timeout, 279 client_disconnect_timeout, job_tag): 280 try: 281 atrun.verify_machine() 282 except: 283 logging.error("Verify failed on %s. Reinstalling autotest", 284 host.hostname) 285 self.install(host) 286 atrun.verify_machine() 287 debug = os.path.join(results_dir, 'debug') 288 try: 289 os.makedirs(debug) 290 except Exception: 291 pass 292 293 delete_file_list = [atrun.remote_control_file, 294 atrun.remote_control_file + '.state', 295 atrun.manual_control_file, 296 atrun.manual_control_file + '.state'] 297 cmd = ';'.join('rm -f ' + control for control in delete_file_list) 298 host.run(cmd, ignore_status=True) 299 300 tmppath = utils.get(control_file) 301 302 # build up the initialization prologue for the control file 303 prologue_lines = [] 304 prologue_lines.append("job.set_default_profile_only(%r)\n" 305 % host.job.default_profile_only) 306 prologue_lines.append("job.default_boot_tag(%r)\n" 307 % host.job.last_boot_tag) 308 prologue_lines.append("job.default_test_cleanup(%r)\n" 309 % host.job.run_test_cleanup) 310 if job_tag: 311 prologue_lines.append("job.default_tag(%r)\n" % job_tag) 312 313 # If the packaging system is being used, add the repository list. 314 try: 315 c = global_config.global_config 316 repos = c.get_config_value("PACKAGES", 'fetch_location', type=list) 317 repos.reverse() # high priority packages should be added last 318 pkgmgr = packages.PackageManager('autotest', hostname=host.hostname, 319 repo_urls=repos) 320 prologue_lines.append('job.add_repository(%s)\n' % repos) 321 except global_config.ConfigError, e: 322 pass 323 324 # on full-size installs, turn on any profilers the server is using 325 if not atrun.background: 326 running_profilers = host.job.profilers.add_log.iteritems() 327 for profiler, (args, dargs) in running_profilers: 328 call_args = [repr(profiler)] 329 call_args += [repr(arg) for arg in args] 330 call_args += ["%s=%r" % item for item in dargs.iteritems()] 331 prologue_lines.append("job.profilers.add(%s)\n" 332 % ", ".join(call_args)) 333 cfile = "".join(prologue_lines) 334 335 cfile += open(tmppath).read() 336 open(tmppath, "w").write(cfile) 337 338 # Create and copy state file to remote_control_file + '.state' 339 sysinfo_state = {"__sysinfo": host.job.sysinfo.serialize()} 340 state_file = self._create_state_file(host.job, sysinfo_state) 341 host.send_file(state_file, atrun.remote_control_file + '.state') 342 os.remove(state_file) 343 344 # Copy control_file to remote_control_file on the host 345 host.send_file(tmppath, atrun.remote_control_file) 346 if os.path.abspath(tmppath) != os.path.abspath(control_file): 347 os.remove(tmppath) 348 349 atrun.execute_control( 350 timeout=timeout, 351 client_disconnect_timeout=client_disconnect_timeout) 352 353 354 def _create_state_file(self, job, state_dict): 355 """ Create a state file from a dictionary. Returns the path of the 356 state file. """ 357 fd, path = tempfile.mkstemp(dir=job.tmpdir) 358 state_file = os.fdopen(fd, "w") 359 pickle.dump(state_dict, state_file) 360 state_file.close() 361 return path 362 363 364 def run_timed_test(self, test_name, results_dir='.', host=None, 365 timeout=None, *args, **dargs): 366 """ 367 Assemble a tiny little control file to just run one test, 368 and run it as an autotest client-side test 369 """ 370 if not host: 371 host = self.host 372 if not self.installed: 373 self.install(host) 374 opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()] 375 cmd = ", ".join([repr(test_name)] + map(repr, args) + opts) 376 control = "job.run_test(%s)\n" % cmd 377 self.run(control, results_dir, host, timeout=timeout) 378 379 380 def run_test(self, test_name, results_dir='.', host=None, *args, **dargs): 381 self.run_timed_test(test_name, results_dir, host, timeout=None, 382 *args, **dargs) 383 384 385class _Run(object): 386 """ 387 Represents a run of autotest control file. This class maintains 388 all the state necessary as an autotest control file is executed. 389 390 It is not intended to be used directly, rather control files 391 should be run using the run method in Autotest. 392 """ 393 def __init__(self, host, results_dir, tag, parallel_flag, background): 394 self.host = host 395 self.results_dir = results_dir 396 self.env = host.env 397 self.tag = tag 398 self.parallel_flag = parallel_flag 399 self.background = background 400 self.autodir = _get_autodir(self.host) 401 control = os.path.join(self.autodir, 'control') 402 if tag: 403 control += '.' + tag 404 self.manual_control_file = control 405 self.remote_control_file = control + '.autoserv' 406 407 408 def verify_machine(self): 409 binary = os.path.join(self.autodir, 'bin/autotest') 410 try: 411 self.host.run('ls %s > /dev/null 2>&1' % binary) 412 except: 413 raise "Autotest does not appear to be installed" 414 415 if not self.parallel_flag: 416 tmpdir = os.path.join(self.autodir, 'tmp') 417 download = os.path.join(self.autodir, 'tests/download') 418 self.host.run('umount %s' % tmpdir, ignore_status=True) 419 self.host.run('umount %s' % download, ignore_status=True) 420 421 422 def get_base_cmd_args(self, section): 423 args = ['--verbose'] 424 if section > 0: 425 args.append('-c') 426 if self.tag: 427 args.append('-t %s' % self.tag) 428 if self.host.job.use_external_logging(): 429 args.append('-l') 430 if self.host.hostname: 431 args.append('--hostname=%s' % self.host.hostname) 432 args.append('--user=%s' % getpass.getuser()) 433 434 args.append(self.remote_control_file) 435 return args 436 437 438 def get_background_cmd(self, section): 439 cmd = ['nohup', os.path.join(self.autodir, 'bin/autotest_client')] 440 cmd += self.get_base_cmd_args(section) 441 cmd.append('>/dev/null 2>/dev/null &') 442 return ' '.join(cmd) 443 444 445 def get_daemon_cmd(self, section, monitor_dir): 446 cmd = ['nohup', os.path.join(self.autodir, 'bin/autotestd'), 447 monitor_dir, '-H autoserv'] 448 cmd += self.get_base_cmd_args(section) 449 cmd.append('>/dev/null 2>/dev/null </dev/null &') 450 return ' '.join(cmd) 451 452 453 def get_monitor_cmd(self, monitor_dir, stdout_read, stderr_read): 454 cmd = [os.path.join(self.autodir, 'bin', 'autotestd_monitor'), 455 monitor_dir, str(stdout_read), str(stderr_read)] 456 return ' '.join(cmd) 457 458 459 def get_client_log(self, section): 460 """ Find what the "next" client.log.* file should be and open it. """ 461 debug_dir = os.path.join(self.results_dir, "debug") 462 client_logs = glob.glob(os.path.join(debug_dir, "client.log.*")) 463 next_log = os.path.join(debug_dir, "client.log.%d" % len(client_logs)) 464 return open(next_log, "w", 0) 465 466 467 @staticmethod 468 def is_client_job_finished(last_line): 469 return bool(re.match(r'^END .*\t----\t----\t.*$', last_line)) 470 471 472 @staticmethod 473 def is_client_job_rebooting(last_line): 474 return bool(re.match(r'^\t*GOOD\t----\treboot\.start.*$', last_line)) 475 476 477 def log_unexpected_abort(self, stderr_redirector): 478 stderr_redirector.flush_all_buffers() 479 msg = "Autotest client terminated unexpectedly" 480 self.host.job.record("END ABORT", None, None, msg) 481 482 483 def _execute_in_background(self, section, timeout): 484 full_cmd = self.get_background_cmd(section) 485 devnull = open(os.devnull, "w") 486 487 old_resultdir = self.host.job.resultdir 488 try: 489 self.host.job.resultdir = self.results_dir 490 result = self.host.run(full_cmd, ignore_status=True, 491 timeout=timeout, 492 stdout_tee=devnull, 493 stderr_tee=devnull) 494 finally: 495 self.host.job.resultdir = old_resultdir 496 497 return result 498 499 500 @staticmethod 501 def _strip_stderr_prologue(stderr): 502 """Strips the 'standard' prologue that get pre-pended to every 503 remote command and returns the text that was actually written to 504 stderr by the remote command.""" 505 stderr_lines = stderr.split("\n")[1:] 506 if not stderr_lines: 507 return "" 508 elif stderr_lines[0].startswith("NOTE: autotestd_monitor"): 509 del stderr_lines[0] 510 return "\n".join(stderr_lines) 511 512 513 def _execute_daemon(self, section, timeout, stderr_redirector, 514 client_disconnect_timeout): 515 monitor_dir = self.host.get_tmp_dir() 516 daemon_cmd = self.get_daemon_cmd(section, monitor_dir) 517 client_log = self.get_client_log(section) 518 519 stdout_read = stderr_read = 0 520 old_resultdir = self.host.job.resultdir 521 try: 522 self.host.job.resultdir = self.results_dir 523 self.host.run(daemon_cmd, ignore_status=True, timeout=timeout) 524 disconnect_warnings = [] 525 while True: 526 monitor_cmd = self.get_monitor_cmd(monitor_dir, stdout_read, 527 stderr_read) 528 try: 529 result = self.host.run(monitor_cmd, ignore_status=True, 530 timeout=timeout, 531 stdout_tee=client_log, 532 stderr_tee=stderr_redirector) 533 except error.AutoservRunError, e: 534 result = e.result_obj 535 result.exit_status = None 536 disconnect_warnings.append(e.description) 537 538 stderr_redirector.log_warning( 539 "Autotest client was disconnected: %s" % e.description, 540 "NETWORK") 541 except error.AutoservSSHTimeout: 542 result = utils.CmdResult(monitor_cmd, "", "", None, 0) 543 stderr_redirector.log_warning( 544 "Attempt to connect to Autotest client timed out", 545 "NETWORK") 546 547 stdout_read += len(result.stdout) 548 stderr_read += len(self._strip_stderr_prologue(result.stderr)) 549 550 if result.exit_status is not None: 551 return result 552 elif not self.host.wait_up(client_disconnect_timeout): 553 raise error.AutoservSSHTimeout( 554 "client was disconnected, reconnect timed out") 555 finally: 556 self.host.job.resultdir = old_resultdir 557 558 559 def execute_section(self, section, timeout, stderr_redirector, 560 client_disconnect_timeout): 561 logging.info("Executing %s/bin/autotest %s/control phase %d", 562 self.autodir, self.autodir, section) 563 564 if self.background: 565 result = self._execute_in_background(section, timeout) 566 else: 567 result = self._execute_daemon(section, timeout, stderr_redirector, 568 client_disconnect_timeout) 569 570 last_line = stderr_redirector.last_line 571 572 # check if we failed hard enough to warrant an exception 573 if result.exit_status == 1: 574 err = error.AutotestRunError("client job was aborted") 575 elif not self.background and not result.stderr: 576 err = error.AutotestRunError( 577 "execute_section %s failed to return anything\n" 578 "stdout:%s\n" % (section, result.stdout)) 579 else: 580 err = None 581 582 # log something if the client failed AND never finished logging 583 if err and not self.is_client_job_finished(last_line): 584 self.log_unexpected_abort(stderr_redirector) 585 586 if err: 587 raise err 588 else: 589 return stderr_redirector.last_line 590 591 592 def _wait_for_reboot(self): 593 logging.info("Client is rebooting") 594 logging.info("Waiting for client to halt") 595 if not self.host.wait_down(HALT_TIME): 596 err = "%s failed to shutdown after %d" 597 err %= (self.host.hostname, HALT_TIME) 598 raise error.AutotestRunError(err) 599 logging.info("Client down, waiting for restart") 600 if not self.host.wait_up(BOOT_TIME): 601 # since reboot failed 602 # hardreset the machine once if possible 603 # before failing this control file 604 warning = "%s did not come back up, hard resetting" 605 warning %= self.host.hostname 606 logging.warning(warning) 607 try: 608 self.host.hardreset(wait=False) 609 except (AttributeError, error.AutoservUnsupportedError): 610 warning = "Hard reset unsupported on %s" 611 warning %= self.host.hostname 612 logging.warning(warning) 613 raise error.AutotestRunError("%s failed to boot after %ds" % 614 (self.host.hostname, BOOT_TIME)) 615 self.host.reboot_followup() 616 617 618 def _process_client_state_file(self): 619 state_file = os.path.basename(self.remote_control_file) + ".state" 620 state_path = os.path.join(self.results_dir, state_file) 621 try: 622 state_dict = pickle.load(open(state_path)) 623 except Exception, e: 624 msg = "Ignoring error while loading client job state file: %s" % e 625 logging.warning(msg) 626 state_dict = {} 627 628 # clear out the state file 629 # TODO: stash the file away somewhere useful instead 630 try: 631 os.remove(state_path) 632 except Exception: 633 pass 634 635 logging.debug("Persistent state variables pulled back from %s: %s", 636 self.host.hostname, state_dict) 637 638 if "__default_profile_only" in state_dict: 639 self.host.job.default_profile_only = ( 640 state_dict["__default_profile_only"]) 641 642 if "__run_test_cleanup" in state_dict: 643 if state_dict["__run_test_cleanup"]: 644 self.host.job.enable_test_cleanup() 645 else: 646 self.host.job.disable_test_cleanup() 647 648 if "__last_boot_tag" in state_dict: 649 self.host.job.last_boot_tag = state_dict["__last_boot_tag"] 650 651 if "__sysinfo" in state_dict: 652 self.host.job.sysinfo.deserialize(state_dict["__sysinfo"]) 653 654 655 def execute_control(self, timeout=None, client_disconnect_timeout=None): 656 if not self.background: 657 collector = log_collector(self.host, self.tag, self.results_dir) 658 hostname = self.host.hostname 659 remote_results = collector.client_results_dir 660 local_results = collector.server_results_dir 661 self.host.job.add_client_log(hostname, remote_results, 662 local_results) 663 664 section = 0 665 start_time = time.time() 666 667 logger = client_logger(self.host, self.tag, self.results_dir) 668 try: 669 while not timeout or time.time() < start_time + timeout: 670 if timeout: 671 section_timeout = start_time + timeout - time.time() 672 else: 673 section_timeout = None 674 last = self.execute_section(section, section_timeout, 675 logger, client_disconnect_timeout) 676 if self.background: 677 return 678 section += 1 679 if self.is_client_job_finished(last): 680 logging.info("Client complete") 681 return 682 elif self.is_client_job_rebooting(last): 683 try: 684 self._wait_for_reboot() 685 except error.AutotestRunError, e: 686 self.host.job.record("ABORT", None, "reboot", str(e)) 687 self.host.job.record("END ABORT", None, None, str(e)) 688 raise 689 continue 690 691 # if we reach here, something unexpected happened 692 self.log_unexpected_abort(logger) 693 694 # give the client machine a chance to recover from a crash 695 self.host.wait_up(CRASH_RECOVERY_TIME) 696 msg = ("Aborting - unexpected final status message from " 697 "client: %s\n") % last 698 raise error.AutotestRunError(msg) 699 finally: 700 logger.close() 701 if not self.background: 702 collector.collect_client_job_results() 703 self._process_client_state_file() 704 self.host.job.remove_client_log(hostname, remote_results, 705 local_results) 706 707 # should only get here if we timed out 708 assert timeout 709 raise error.AutotestTimeoutError() 710 711 712def _get_autodir(host): 713 autodir = host.get_autodir() 714 if autodir: 715 logging.debug('Using existing host autodir: %s', autodir) 716 return autodir 717 client_autodir_paths = global_config.global_config.get_config_value( 718 'AUTOSERV', 'client_autodir_paths', type=list) 719 720 for path in client_autodir_paths: 721 try: 722 autotest_binary = os.path.join(path, 'bin', 'autotest') 723 host.run('test -x %s' % utils.sh_escape(autotest_binary)) 724 logging.debug('Found autodir at %s', path) 725 return path 726 except error.AutoservRunError: 727 logging.debug('%s does not exist on %s', path, host.hostname) 728 raise error.AutotestRunError('Cannot figure out autotest directory') 729 730 731class log_collector(object): 732 def __init__(self, host, client_tag, results_dir): 733 self.host = host 734 if not client_tag: 735 client_tag = "default" 736 self.client_results_dir = os.path.join(host.get_autodir(), "results", 737 client_tag) 738 self.server_results_dir = results_dir 739 740 741 def collect_client_job_results(self): 742 """ A method that collects all the current results of a running 743 client job into the results dir. By default does nothing as no 744 client job is running, but when running a client job you can override 745 this with something that will actually do something. """ 746 747 # make an effort to wait for the machine to come up 748 try: 749 self.host.wait_up(timeout=30) 750 except error.AutoservError: 751 # don't worry about any errors, we'll try and 752 # get the results anyway 753 pass 754 755 # Copy all dirs in default to results_dir 756 try: 757 self.host.get_file(self.client_results_dir + '/', 758 self.server_results_dir, preserve_symlinks=True) 759 except Exception: 760 # well, don't stop running just because we couldn't get logs 761 e_msg = "Unexpected error copying test result logs, continuing ..." 762 logging.error(e_msg) 763 traceback.print_exc(file=sys.stdout) 764 765 766# a file-like object for catching stderr from an autotest client and 767# extracting status logs from it 768class client_logger(object): 769 """Partial file object to write to both stdout and 770 the status log file. We only implement those methods 771 utils.run() actually calls. 772 773 Note that this class is fairly closely coupled with server_job, as it 774 uses special job._ methods to actually carry out the loggging. 775 """ 776 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$") 777 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$") 778 fetch_package_parser = re.compile( 779 r"^AUTOTEST_FETCH_PACKAGE:([^:]*):([^:]*):(.*)$") 780 extract_indent = re.compile(r"^(\t*).*$") 781 extract_timestamp = re.compile(r".*\ttimestamp=(\d+)\t.*$") 782 783 def __init__(self, host, tag, server_results_dir): 784 self.host = host 785 self.job = host.job 786 self.log_collector = log_collector(host, tag, server_results_dir) 787 self.leftover = "" 788 self.last_line = "" 789 self.newest_timestamp = float("-inf") 790 self.logs = {} 791 self.server_warnings = [] 792 793 794 def _update_timestamp(self, line): 795 match = self.extract_timestamp.search(line) 796 if match: 797 self.newest_timestamp = max(self.newest_timestamp, 798 int(match.group(1))) 799 800 801 def _process_log_dict(self, log_dict): 802 log_list = log_dict.pop("logs", []) 803 for key in sorted(log_dict.iterkeys()): 804 log_list += self._process_log_dict(log_dict.pop(key)) 805 return log_list 806 807 808 def _process_logs(self): 809 """Go through the accumulated logs in self.log and print them 810 out to stdout and the status log. Note that this processes 811 logs in an ordering where: 812 813 1) logs to different tags are never interleaved 814 2) logs to x.y come before logs to x.y.z for all z 815 3) logs to x.y come before x.z whenever y < z 816 817 Note that this will in general not be the same as the 818 chronological ordering of the logs. However, if a chronological 819 ordering is desired that one can be reconstructed from the 820 status log by looking at timestamp lines.""" 821 log_list = self._process_log_dict(self.logs) 822 for line in log_list: 823 self.job._record_prerendered(line + '\n') 824 if log_list: 825 self.last_line = log_list[-1] 826 827 828 def _process_quoted_line(self, tag, line): 829 """Process a line quoted with an AUTOTEST_STATUS flag. If the 830 tag is blank then we want to push out all the data we've been 831 building up in self.logs, and then the newest line. If the 832 tag is not blank, then push the line into the logs for handling 833 later.""" 834 logging.info(line) 835 if tag == "": 836 self._process_logs() 837 self.job._record_prerendered(line + '\n') 838 self.last_line = line 839 else: 840 tag_parts = [int(x) for x in tag.split(".")] 841 log_dict = self.logs 842 for part in tag_parts: 843 log_dict = log_dict.setdefault(part, {}) 844 log_list = log_dict.setdefault("logs", []) 845 log_list.append(line) 846 847 848 def _process_info_line(self, line): 849 """Check if line is an INFO line, and if it is, interpret any control 850 messages (e.g. enabling/disabling warnings) that it may contain.""" 851 match = re.search(r"^\t*INFO\t----\t----(.*)\t[^\t]*$", line) 852 if not match: 853 return # not an INFO line 854 for field in match.group(1).split('\t'): 855 if field.startswith("warnings.enable="): 856 func = self.job.warning_manager.enable_warnings 857 elif field.startswith("warnings.disable="): 858 func = self.job.warning_manager.disable_warnings 859 else: 860 continue 861 warning_type = field.split("=", 1)[1] 862 func(warning_type) 863 864 865 def _process_line(self, line): 866 """Write out a line of data to the appropriate stream. Status 867 lines sent by autotest will be prepended with 868 "AUTOTEST_STATUS", and all other lines are ssh error 869 messages.""" 870 status_match = self.status_parser.search(line) 871 test_complete_match = self.test_complete_parser.search(line) 872 fetch_package_match = self.fetch_package_parser.search(line) 873 if status_match: 874 tag, line = status_match.groups() 875 self._process_info_line(line) 876 self._process_quoted_line(tag, line) 877 elif test_complete_match: 878 self._process_logs() 879 fifo_path, = test_complete_match.groups() 880 self.log_collector.collect_client_job_results() 881 self.host.run("echo A > %s" % fifo_path) 882 elif fetch_package_match: 883 pkg_name, dest_path, fifo_path = fetch_package_match.groups() 884 serve_packages = global_config.global_config.get_config_value( 885 "PACKAGES", "serve_packages_from_autoserv", type=bool) 886 if serve_packages and pkg_name.endswith(".tar.bz2"): 887 try: 888 self._send_tarball(pkg_name, dest_path) 889 except Exception: 890 msg = "Package tarball creation failed, continuing anyway" 891 logging.exception(msg) 892 self.host.run("echo B > %s" % fifo_path) 893 else: 894 logging.info(line) 895 896 897 def _send_tarball(self, pkg_name, remote_dest): 898 name, pkg_type = self.job.pkgmgr.parse_tarball_name(pkg_name) 899 src_dirs = [] 900 if pkg_type == 'test': 901 src_dirs += [os.path.join(self.job.clientdir, 'site_tests', name), 902 os.path.join(self.job.clientdir, 'tests', name)] 903 elif pkg_type == 'profiler': 904 src_dirs += [os.path.join(self.job.clientdir, 'profilers', name)] 905 elif pkg_type == 'dep': 906 src_dirs += [os.path.join(self.job.clientdir, 'deps', name)] 907 elif pkg_type == 'client': 908 return # you must already have a client to hit this anyway 909 else: 910 return # no other types are supported 911 912 # iterate over src_dirs until we find one that exists, then tar it 913 for src_dir in src_dirs: 914 if os.path.exists(src_dir): 915 try: 916 logging.info('Bundling %s into %s', src_dir, pkg_name) 917 temp_dir = autotemp.tempdir(unique_id='autoserv-packager', 918 dir=self.job.tmpdir) 919 tarball_path = self.job.pkgmgr.tar_package( 920 pkg_name, src_dir, temp_dir.name, " .") 921 self.host.send_file(tarball_path, remote_dest) 922 finally: 923 temp_dir.clean() 924 return 925 926 927 def _format_warnings(self, last_line, warnings): 928 # use the indentation of whatever the last log line was 929 indent = self.extract_indent.match(last_line).group(1) 930 # if the last line starts a new group, add an extra indent 931 if last_line.lstrip('\t').startswith("START\t"): 932 indent += '\t' 933 return [self.job._render_record("WARN", None, None, msg, 934 timestamp, indent).rstrip('\n') 935 for timestamp, msg in warnings] 936 937 938 def _process_warnings(self, last_line, log_dict, warnings): 939 if log_dict.keys() in ([], ["logs"]): 940 # there are no sub-jobs, just append the warnings here 941 warnings = self._format_warnings(last_line, warnings) 942 log_list = log_dict.setdefault("logs", []) 943 log_list += warnings 944 for warning in warnings: 945 sys.stdout.write(warning + '\n') 946 else: 947 # there are sub-jobs, so put the warnings in there 948 log_list = log_dict.get("logs", []) 949 if log_list: 950 last_line = log_list[-1] 951 for key in sorted(log_dict.iterkeys()): 952 if key != "logs": 953 self._process_warnings(last_line, 954 log_dict[key], 955 warnings) 956 957 958 def log_warning(self, msg, warning_type): 959 """Injects a WARN message into the current status logging stream.""" 960 timestamp = int(time.time()) 961 if self.job.warning_manager.is_valid(timestamp, warning_type): 962 self.server_warnings.append((timestamp, msg)) 963 964 965 def write(self, data): 966 # first check for any new console warnings 967 warnings = self.job._read_warnings() + self.server_warnings 968 warnings.sort() # sort into timestamp order 969 # now process the newest data written out 970 data = self.leftover + data 971 lines = data.split("\n") 972 # process every line but the last one 973 for line in lines[:-1]: 974 self._update_timestamp(line) 975 # output any warnings between now and the next status line 976 old_warnings = [(timestamp, msg) for timestamp, msg in warnings 977 if timestamp < self.newest_timestamp] 978 self._process_warnings(self.last_line, self.logs, warnings) 979 del warnings[:len(old_warnings)] 980 self._process_line(line) 981 # save off any warnings not yet logged for later processing 982 self.server_warnings = warnings 983 # save the last line for later processing 984 # since we may not have the whole line yet 985 self.leftover = lines[-1] 986 987 988 def flush(self): 989 sys.stdout.flush() 990 991 992 def flush_all_buffers(self): 993 if self.leftover: 994 self._process_line(self.leftover) 995 self.leftover = "" 996 self._process_warnings(self.last_line, self.logs, self.server_warnings) 997 self._process_logs() 998 self.flush() 999 1000 1001 def close(self): 1002 self.flush_all_buffers() 1003 1004 1005SiteAutotest = client_utils.import_site_class( 1006 __file__, "autotest_lib.server.site_autotest", "SiteAutotest", 1007 BaseAutotest) 1008 1009class Autotest(SiteAutotest): 1010 pass 1011