autotest.py revision a700772e70548b10721ba709c7ec5194bb6d7597
1# Copyright 2007 Google Inc. Released under the GPL v2 2 3import re, os, sys, traceback, subprocess, tempfile, time, pickle, glob 4from autotest_lib.server import installable_object, utils 5from autotest_lib.client.common_lib import log, error, debug 6from autotest_lib.client.common_lib import global_config, packages 7from autotest_lib.client.common_lib import utils as client_utils 8 9AUTOTEST_SVN = 'svn://test.kernel.org/autotest/trunk/client' 10AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client' 11 12# Timeouts for powering down and up respectively 13HALT_TIME = 300 14BOOT_TIME = 1800 15CRASH_RECOVERY_TIME = 9000 16 17 18class BaseAutotest(installable_object.InstallableObject): 19 """ 20 This class represents the Autotest program. 21 22 Autotest is used to run tests automatically and collect the results. 23 It also supports profilers. 24 25 Implementation details: 26 This is a leaf class in an abstract class hierarchy, it must 27 implement the unimplemented methods in parent classes. 28 """ 29 30 def __init__(self, host = None): 31 self.host = host 32 self.got = False 33 self.installed = False 34 self.serverdir = utils.get_server_dir() 35 super(BaseAutotest, self).__init__() 36 self.logger = debug.get_logger(module='server') 37 38 39 install_in_tmpdir = False 40 @classmethod 41 def set_install_in_tmpdir(cls, flag): 42 """ Sets a flag that controls whether or not Autotest should by 43 default be installed in a "standard" directory (e.g. 44 /home/autotest, /usr/local/autotest) or a temporary directory. """ 45 cls.install_in_tmpdir = flag 46 47 48 def _get_install_dir(self, host): 49 """ Determines the location where autotest should be installed on 50 host. If self.install_in_tmpdir is set, it will return a unique 51 temporary directory that autotest can be installed in. """ 52 try: 53 autodir = _get_autodir(host) 54 except error.AutotestRunError: 55 autodir = '/usr/local/autotest' 56 if self.install_in_tmpdir: 57 autodir = host.get_tmp_dir(parent=autodir) 58 return autodir 59 60 61 @log.record 62 def install(self, host=None, autodir=None): 63 self._install(host=host, autodir=autodir) 64 65 66 def install_base(self, host=None, autodir=None): 67 """ Performs a lightweight autotest install. Useful for when you 68 want to run some client-side code but don't want to pay the cost 69 of a full installation. """ 70 self._install(host=host, autodir=autodir, lightweight=True) 71 72 73 def _install(self, host=None, autodir=None, lightweight=False): 74 """ 75 Install autotest. If get() was not called previously, an 76 attempt will be made to install from the autotest svn 77 repository. 78 79 Args: 80 host: a Host instance on which autotest will be installed 81 autodir: location on the remote host to install to 82 lightweight: exclude tests, deps and profilers, if possible 83 84 Raises: 85 AutoservError: if a tarball was not specified and 86 the target host does not have svn installed in its path""" 87 if not host: 88 host = self.host 89 if not self.got: 90 self.get() 91 host.wait_up(timeout=30) 92 host.setup() 93 print "Installing autotest on %s" % host.hostname 94 95 # set up the autotest directory on the remote machine 96 if not autodir: 97 autodir = self._get_install_dir(host) 98 host.set_autodir(autodir) 99 host.run('mkdir -p "%s"' % utils.sh_escape(autodir)) 100 101 # Fetch the autotest client from the nearest repository 102 try: 103 c = global_config.global_config 104 repos = c.get_config_value("PACKAGES", 'fetch_location', type=list) 105 pkgmgr = packages.PackageManager(autodir, hostname=host.hostname, 106 repo_urls=repos, 107 do_locking=False, 108 run_function=host.run, 109 run_function_dargs=dict(timeout=600)) 110 # The packages dir is used to store all the packages that 111 # are fetched on that client. (for the tests,deps etc. 112 # too apart from the client) 113 pkg_dir = os.path.join(autodir, 'packages') 114 # clean up the autodir except for the packages directory 115 host.run('cd %s && ls | grep -v "^packages$"' 116 ' | xargs rm -rf && rm -rf .[^.]*' % autodir) 117 pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir, 118 preserve_install_dir=True) 119 self.installed = True 120 return 121 except global_config.ConfigError, e: 122 print ("Could not install autotest using the" 123 " packaging system %s" % e) 124 except (packages.PackageInstallError, error.AutoservRunError), e: 125 print "Could not install autotest from %s : %s " % (repos, e) 126 127 128 # try to install from file or directory 129 if self.source_material: 130 if os.path.isdir(self.source_material): 131 # Copy autotest recursively 132 if lightweight: 133 dirs_to_exclude = set(["tests", "site_tests", "deps", 134 "tools", "profilers"]) 135 light_files = [os.path.join(self.source_material, f) 136 for f in os.listdir(self.source_material) 137 if f not in dirs_to_exclude] 138 host.send_file(light_files, autodir, delete_dest=True) 139 140 # create empty dirs for all the stuff we excluded 141 commands = [] 142 for path in dirs_to_exclude: 143 abs_path = os.path.join(autodir, path) 144 abs_path = utils.sh_escape(abs_path) 145 commands.append("mkdir -p '%s'" % abs_path) 146 host.run(';'.join(commands)) 147 else: 148 host.send_file(self.source_material, autodir, 149 delete_dest=True) 150 else: 151 # Copy autotest via tarball 152 e_msg = 'Installation method not yet implemented!' 153 raise NotImplementedError(e_msg) 154 print "Installation of autotest completed" 155 self.installed = True 156 return 157 158 # if that fails try to install using svn 159 if utils.run('which svn').exit_status: 160 raise error.AutoservError('svn not found on target machine: %s' 161 % host.name) 162 try: 163 host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir)) 164 except error.AutoservRunError, e: 165 host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir)) 166 print "Installation of autotest completed" 167 self.installed = True 168 169 170 def get(self, location = None): 171 if not location: 172 location = os.path.join(self.serverdir, '../client') 173 location = os.path.abspath(location) 174 # If there's stuff run on our client directory already, it 175 # can cause problems. Try giving it a quick clean first. 176 cwd = os.getcwd() 177 os.chdir(location) 178 os.system('tools/make_clean') 179 os.chdir(cwd) 180 super(BaseAutotest, self).get(location) 181 self.got = True 182 183 184 def run(self, control_file, results_dir = '.', host = None, 185 timeout=None, tag=None, parallel_flag=False, background=False): 186 """ 187 Run an autotest job on the remote machine. 188 189 Args: 190 control_file: an open file-like-obj of the control file 191 results_dir: a str path where the results should be stored 192 on the local filesystem 193 host: a Host instance on which the control file should 194 be run 195 tag: tag name for the client side instance of autotest 196 parallel_flag: flag set when multiple jobs are run at the 197 same time 198 background: indicates that the client should be launched as 199 a background job; the code calling run will be 200 responsible for monitoring the client and 201 collecting the results 202 Raises: 203 AutotestRunError: if there is a problem executing 204 the control file 205 """ 206 host = self._get_host_and_setup(host) 207 results_dir = os.path.abspath(results_dir) 208 209 if tag: 210 results_dir = os.path.join(results_dir, tag) 211 212 atrun = _Run(host, results_dir, tag, parallel_flag, background) 213 self._do_run(control_file, results_dir, host, atrun, timeout) 214 215 216 def _get_host_and_setup(self, host): 217 if not host: 218 host = self.host 219 if not self.installed: 220 self.install(host) 221 222 host.wait_up(timeout=30) 223 return host 224 225 226 def _do_run(self, control_file, results_dir, host, atrun, timeout): 227 try: 228 atrun.verify_machine() 229 except: 230 print "Verify failed on %s. Reinstalling autotest" % host.hostname 231 self.install(host) 232 atrun.verify_machine() 233 debug = os.path.join(results_dir, 'debug') 234 try: 235 os.makedirs(debug) 236 except Exception: 237 pass 238 239 delete_file_list = [atrun.remote_control_file, 240 atrun.remote_control_file + '.state', 241 atrun.manual_control_file, 242 atrun.manual_control_file + '.state'] 243 cmd = ';'.join('rm -f ' + control for control in delete_file_list) 244 host.run(cmd, ignore_status=True) 245 246 tmppath = utils.get(control_file) 247 248 cfile = "job.default_boot_tag(%r)\n" % host.job.last_boot_tag 249 cfile += "job.default_test_cleanup(%r)\n" % host.job.run_test_cleanup 250 251 # If the packaging system is being used, add the repository list. 252 try: 253 c = global_config.global_config 254 repos = c.get_config_value("PACKAGES", 'fetch_location', type=list) 255 pkgmgr = packages.PackageManager('autotest', hostname=host.hostname, 256 repo_urls=repos) 257 cfile += 'job.add_repository(%s)\n' % pkgmgr.repo_urls 258 except global_config.ConfigError, e: 259 pass 260 261 cfile += open(tmppath).read() 262 open(tmppath, "w").write(cfile) 263 264 # Create and copy state file to remote_control_file + '.state' 265 sysinfo_state = {"__sysinfo": host.job.sysinfo.serialize()} 266 state_file = self._create_state_file(host.job, sysinfo_state) 267 host.send_file(state_file, atrun.remote_control_file + '.state') 268 os.remove(state_file) 269 270 # Copy control_file to remote_control_file on the host 271 host.send_file(tmppath, atrun.remote_control_file) 272 if os.path.abspath(tmppath) != os.path.abspath(control_file): 273 os.remove(tmppath) 274 275 try: 276 atrun.execute_control(timeout=timeout) 277 finally: 278 if not atrun.background: 279 collector = log_collector(host, atrun.tag, results_dir) 280 collector.collect_client_job_results() 281 self._process_client_state_file(host, atrun, results_dir) 282 283 284 def _create_state_file(self, job, state_dict): 285 """ Create a state file from a dictionary. Returns the path of the 286 state file. """ 287 fd, path = tempfile.mkstemp(dir=job.tmpdir) 288 state_file = os.fdopen(fd, "w") 289 pickle.dump(state_dict, state_file) 290 state_file.close() 291 return path 292 293 294 def _process_client_state_file(self, host, atrun, results_dir): 295 state_file = os.path.basename(atrun.remote_control_file) + ".state" 296 state_path = os.path.join(results_dir, state_file) 297 try: 298 state_dict = pickle.load(open(state_path)) 299 except Exception, e: 300 msg = "Ignoring error while loading client job state file: %s" % e 301 self.logger.warning(msg) 302 state_dict = {} 303 304 # clear out the state file 305 # TODO: stash the file away somewhere useful instead 306 try: 307 os.remove(state_path) 308 except Exception: 309 pass 310 311 msg = "Persistent state variables pulled back from %s: %s" 312 msg %= (host.hostname, state_dict) 313 print msg 314 315 if "__run_test_cleanup" in state_dict: 316 if state_dict["__run_test_cleanup"]: 317 host.job.enable_test_cleanup() 318 else: 319 host.job.disable_test_cleanup() 320 321 if "__last_boot_tag" in state_dict: 322 host.job.last_boot_tag = state_dict["__last_boot_tag"] 323 324 if "__sysinfo" in state_dict: 325 host.job.sysinfo.deserialize(state_dict["__sysinfo"]) 326 327 328 def run_timed_test(self, test_name, results_dir='.', host=None, 329 timeout=None, *args, **dargs): 330 """ 331 Assemble a tiny little control file to just run one test, 332 and run it as an autotest client-side test 333 """ 334 if not host: 335 host = self.host 336 if not self.installed: 337 self.install(host) 338 opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()] 339 cmd = ", ".join([repr(test_name)] + map(repr, args) + opts) 340 control = "job.run_test(%s)\n" % cmd 341 self.run(control, results_dir, host, timeout=timeout) 342 343 344 def run_test(self, test_name, results_dir='.', host=None, *args, **dargs): 345 self.run_timed_test(test_name, results_dir, host, timeout=None, 346 *args, **dargs) 347 348 349class _Run(object): 350 """ 351 Represents a run of autotest control file. This class maintains 352 all the state necessary as an autotest control file is executed. 353 354 It is not intended to be used directly, rather control files 355 should be run using the run method in Autotest. 356 """ 357 def __init__(self, host, results_dir, tag, parallel_flag, background): 358 self.host = host 359 self.results_dir = results_dir 360 self.env = host.env 361 self.tag = tag 362 self.parallel_flag = parallel_flag 363 self.background = background 364 self.autodir = _get_autodir(self.host) 365 control = os.path.join(self.autodir, 'control') 366 if tag: 367 control += '.' + tag 368 self.manual_control_file = control 369 self.remote_control_file = control + '.autoserv' 370 self.logger = debug.get_logger(module='server') 371 372 373 def verify_machine(self): 374 binary = os.path.join(self.autodir, 'bin/autotest') 375 try: 376 self.host.run('ls %s > /dev/null 2>&1' % binary) 377 except: 378 raise "Autotest does not appear to be installed" 379 380 if not self.parallel_flag: 381 tmpdir = os.path.join(self.autodir, 'tmp') 382 download = os.path.join(self.autodir, 'tests/download') 383 self.host.run('umount %s' % tmpdir, ignore_status=True) 384 self.host.run('umount %s' % download, ignore_status=True) 385 386 def get_full_cmd(self, section): 387 # build up the full command we want to run over the host 388 cmd = [os.path.join(self.autodir, 'bin/autotest_client')] 389 if not self.background: 390 cmd.append('-H autoserv') 391 if section > 0: 392 cmd.append('-c') 393 if self.tag: 394 cmd.append('-t %s' % self.tag) 395 if self.host.job.use_external_logging(): 396 cmd.append('-l') 397 cmd.append(self.remote_control_file) 398 if self.background: 399 cmd = ['nohup'] + cmd + ['>/dev/null 2>/dev/null &'] 400 return ' '.join(cmd) 401 402 403 def get_client_log(self, section): 404 """ Find what the "next" client.log.* file should be and open it. """ 405 debug_dir = os.path.join(self.results_dir, "debug") 406 client_logs = glob.glob(os.path.join(debug_dir, "client.log.*")) 407 next_log = os.path.join(debug_dir, "client.log.%d" % len(client_logs)) 408 return open(next_log, "w", 0) 409 410 411 @staticmethod 412 def is_client_job_finished(last_line): 413 return bool(re.match(r'^END .*\t----\t----\t.*$', last_line)) 414 415 416 @staticmethod 417 def is_client_job_rebooting(last_line): 418 return bool(re.match(r'^\t*GOOD\t----\treboot\.start.*$', last_line)) 419 420 421 def log_unexpected_abort(self): 422 msg = "Autotest client terminated unexpectedly" 423 self.host.job.record("END ABORT", None, None, msg) 424 425 426 def execute_section(self, section, timeout, stderr_redirector): 427 print "Executing %s/bin/autotest %s/control phase %d" % \ 428 (self.autodir, self.autodir, section) 429 430 full_cmd = self.get_full_cmd(section) 431 client_log = self.get_client_log(section) 432 433 try: 434 old_resultdir = self.host.job.resultdir 435 self.host.job.resultdir = self.results_dir 436 result = self.host.run(full_cmd, ignore_status=True, 437 timeout=timeout, 438 stdout_tee=client_log, 439 stderr_tee=stderr_redirector) 440 finally: 441 self.host.job.resultdir = old_resultdir 442 last_line = stderr_redirector.last_line 443 444 # check if we failed hard enough to warrant an exception 445 if result.exit_status == 1: 446 err = error.AutotestRunError("client job was aborted") 447 elif not self.background and not result.stderr: 448 err = error.AutotestRunError( 449 "execute_section: %s failed to return anything\n" 450 "stdout:%s\n" % (full_cmd, result.stdout)) 451 else: 452 err = None 453 454 # log something if the client failed AND never finished logging 455 if err and not self.is_client_job_finished(last_line): 456 self.log_unexpected_abort() 457 458 if err: 459 raise err 460 else: 461 return stderr_redirector.last_line 462 463 464 def _wait_for_reboot(self): 465 self.logger.info("Client is rebooting") 466 self.logger.info("Waiting for client to halt") 467 if not self.host.wait_down(HALT_TIME): 468 err = "%s failed to shutdown after %d" 469 err %= (self.host.hostname, HALT_TIME) 470 raise error.AutotestRunError(err) 471 self.logger.info("Client down, waiting for restart") 472 if not self.host.wait_up(BOOT_TIME): 473 # since reboot failed 474 # hardreset the machine once if possible 475 # before failing this control file 476 warning = "%s did not come back up, hard resetting" 477 warning %= self.host.hostname 478 self.logger.warning(warning) 479 try: 480 self.host.hardreset(wait=False) 481 except (AttributeError, error.AutoservUnsupportedError): 482 warning = "Hard reset unsupported on %s" 483 warning %= self.host.hostname 484 self.logger.warning(warning) 485 raise error.AutotestRunError("%s failed to boot after %ds" % 486 (self.host.hostname, BOOT_TIME)) 487 self.host.reboot_followup() 488 489 490 def execute_control(self, timeout=None): 491 section = 0 492 start_time = time.time() 493 494 logger = client_logger(self.host, self.tag, self.results_dir) 495 try: 496 while not timeout or time.time() < start_time + timeout: 497 if timeout: 498 section_timeout = start_time + timeout - time.time() 499 else: 500 section_timeout = None 501 last = self.execute_section(section, section_timeout, 502 logger) 503 if self.background: 504 return 505 section += 1 506 if self.is_client_job_finished(last): 507 print "Client complete" 508 return 509 elif self.is_client_job_rebooting(last): 510 try: 511 self._wait_for_reboot() 512 except error.AutotestRunError, e: 513 self.host.job.record("ABORT", None, "reboot", str(e)) 514 self.host.job.record("END ABORT", None, None, str(e)) 515 raise 516 continue 517 518 # if we reach here, something unexpected happened 519 self.log_unexpected_abort() 520 521 # give the client machine a chance to recover from a crash 522 self.host.wait_up(CRASH_RECOVERY_TIME) 523 msg = ("Aborting - unexpected final status message from " 524 "client: %s\n") % last 525 raise error.AutotestRunError(msg) 526 finally: 527 logger.close() 528 529 # should only get here if we timed out 530 assert timeout 531 raise error.AutotestTimeoutError() 532 533 534def _get_autodir(host): 535 autodir = host.get_autodir() 536 if autodir: 537 return autodir 538 try: 539 # There's no clean way to do this. readlink may not exist 540 cmd = "python -c 'import os,sys; print os.readlink(sys.argv[1])' /etc/autotest.conf 2> /dev/null" 541 autodir = os.path.dirname(host.run(cmd).stdout) 542 if autodir: 543 return autodir 544 except error.AutoservRunError: 545 pass 546 for path in ['/usr/local/autotest', '/home/autotest']: 547 try: 548 host.run('ls %s > /dev/null 2>&1' % 549 os.path.join(path, 'bin/autotest')) 550 return path 551 except error.AutoservRunError: 552 pass 553 raise error.AutotestRunError("Cannot figure out autotest directory") 554 555 556class log_collector(object): 557 def __init__(self, host, client_tag, results_dir): 558 self.host = host 559 if not client_tag: 560 client_tag = "default" 561 self.client_results_dir = os.path.join(host.get_autodir(), "results", 562 client_tag) 563 self.server_results_dir = results_dir 564 565 566 def collect_client_job_results(self): 567 """ A method that collects all the current results of a running 568 client job into the results dir. By default does nothing as no 569 client job is running, but when running a client job you can override 570 this with something that will actually do something. """ 571 572 # make an effort to wait for the machine to come up 573 try: 574 self.host.wait_up(timeout=30) 575 except error.AutoservError: 576 # don't worry about any errors, we'll try and 577 # get the results anyway 578 pass 579 580 581 # Copy all dirs in default to results_dir 582 try: 583 keyval_path = self._prepare_for_copying_logs() 584 self.host.get_file(self.client_results_dir + '/', 585 self.server_results_dir) 586 self._process_copied_logs(keyval_path) 587 self._postprocess_copied_logs() 588 except Exception: 589 # well, don't stop running just because we couldn't get logs 590 print "Unexpected error copying test result logs, continuing ..." 591 traceback.print_exc(file=sys.stdout) 592 593 594 def _prepare_for_copying_logs(self): 595 server_keyval = os.path.join(self.server_results_dir, 'keyval') 596 if not os.path.exists(server_keyval): 597 # Client-side keyval file can be copied directly 598 return 599 600 # Copy client-side keyval to temporary location 601 suffix = '.keyval_%s' % self.host.hostname 602 fd, keyval_path = tempfile.mkstemp(suffix) 603 os.close(fd) 604 try: 605 client_keyval = os.path.join(self.client_results_dir, 'keyval') 606 try: 607 self.host.get_file(client_keyval, keyval_path) 608 finally: 609 # We will squirrel away the client side keyval 610 # away and move it back when we are done 611 remote_temp_dir = self.host.get_tmp_dir() 612 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval") 613 self.host.run('mv %s %s' % (client_keyval, 614 self.temp_keyval_path)) 615 except (error.AutoservRunError, error.AutoservSSHTimeout): 616 print "Prepare for copying logs failed" 617 return keyval_path 618 619 620 def _process_copied_logs(self, keyval_path): 621 if not keyval_path: 622 # Client-side keyval file was copied directly 623 return 624 625 # Append contents of keyval_<host> file to keyval file 626 try: 627 # Read in new and old keyval files 628 new_keyval = utils.read_keyval(keyval_path) 629 old_keyval = utils.read_keyval(self.server_results_dir) 630 # 'Delete' from new keyval entries that are in both 631 tmp_keyval = {} 632 for key, val in new_keyval.iteritems(): 633 if key not in old_keyval: 634 tmp_keyval[key] = val 635 # Append new info to keyval file 636 utils.write_keyval(self.server_results_dir, tmp_keyval) 637 # Delete keyval_<host> file 638 os.remove(keyval_path) 639 except IOError: 640 print "Process copied logs failed" 641 642 643 def _postprocess_copied_logs(self): 644 # we can now put our keyval file back 645 client_keyval = os.path.join(self.client_results_dir, 'keyval') 646 try: 647 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval)) 648 except Exception: 649 pass 650 651 652 653# a file-like object for catching stderr from an autotest client and 654# extracting status logs from it 655class client_logger(object): 656 """Partial file object to write to both stdout and 657 the status log file. We only implement those methods 658 utils.run() actually calls. 659 660 Note that this class is fairly closely coupled with server_job, as it 661 uses special job._ methods to actually carry out the loggging. 662 """ 663 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$") 664 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$") 665 extract_indent = re.compile(r"^(\t*).*$") 666 667 def __init__(self, host, tag, server_results_dir): 668 self.host = host 669 self.job = host.job 670 self.log_collector = log_collector(host, tag, server_results_dir) 671 self.leftover = "" 672 self.last_line = "" 673 self.logs = {} 674 675 676 def _process_log_dict(self, log_dict): 677 log_list = log_dict.pop("logs", []) 678 for key in sorted(log_dict.iterkeys()): 679 log_list += self._process_log_dict(log_dict.pop(key)) 680 return log_list 681 682 683 def _process_logs(self): 684 """Go through the accumulated logs in self.log and print them 685 out to stdout and the status log. Note that this processes 686 logs in an ordering where: 687 688 1) logs to different tags are never interleaved 689 2) logs to x.y come before logs to x.y.z for all z 690 3) logs to x.y come before x.z whenever y < z 691 692 Note that this will in general not be the same as the 693 chronological ordering of the logs. However, if a chronological 694 ordering is desired that one can be reconstructed from the 695 status log by looking at timestamp lines.""" 696 log_list = self._process_log_dict(self.logs) 697 for line in log_list: 698 self.job._record_prerendered(line + '\n') 699 if log_list: 700 self.last_line = log_list[-1] 701 702 703 def _process_quoted_line(self, tag, line): 704 """Process a line quoted with an AUTOTEST_STATUS flag. If the 705 tag is blank then we want to push out all the data we've been 706 building up in self.logs, and then the newest line. If the 707 tag is not blank, then push the line into the logs for handling 708 later.""" 709 print line 710 if tag == "": 711 self._process_logs() 712 self.job._record_prerendered(line + '\n') 713 self.last_line = line 714 else: 715 tag_parts = [int(x) for x in tag.split(".")] 716 log_dict = self.logs 717 for part in tag_parts: 718 log_dict = log_dict.setdefault(part, {}) 719 log_list = log_dict.setdefault("logs", []) 720 log_list.append(line) 721 722 723 def _process_line(self, line): 724 """Write out a line of data to the appropriate stream. Status 725 lines sent by autotest will be prepended with 726 "AUTOTEST_STATUS", and all other lines are ssh error 727 messages.""" 728 status_match = self.status_parser.search(line) 729 test_complete_match = self.test_complete_parser.search(line) 730 if status_match: 731 tag, line = status_match.groups() 732 self._process_quoted_line(tag, line) 733 elif test_complete_match: 734 fifo_path, = test_complete_match.groups() 735 self.log_collector.collect_client_job_results() 736 self.host.run("echo A > %s" % fifo_path) 737 else: 738 print line 739 740 741 def _format_warnings(self, last_line, warnings): 742 # use the indentation of whatever the last log line was 743 indent = self.extract_indent.match(last_line).group(1) 744 # if the last line starts a new group, add an extra indent 745 if last_line.lstrip('\t').startswith("START\t"): 746 indent += '\t' 747 return [self.job._render_record("WARN", None, None, msg, 748 timestamp, indent).rstrip('\n') 749 for timestamp, msg in warnings] 750 751 752 def _process_warnings(self, last_line, log_dict, warnings): 753 if log_dict.keys() in ([], ["logs"]): 754 # there are no sub-jobs, just append the warnings here 755 warnings = self._format_warnings(last_line, warnings) 756 log_list = log_dict.setdefault("logs", []) 757 log_list += warnings 758 for warning in warnings: 759 sys.stdout.write(warning + '\n') 760 else: 761 # there are sub-jobs, so put the warnings in there 762 log_list = log_dict.get("logs", []) 763 if log_list: 764 last_line = log_list[-1] 765 for key in sorted(log_dict.iterkeys()): 766 if key != "logs": 767 self._process_warnings(last_line, 768 log_dict[key], 769 warnings) 770 771 772 def write(self, data): 773 # first check for any new console warnings 774 warnings = self.job._read_warnings() 775 self._process_warnings(self.last_line, self.logs, warnings) 776 # now process the newest data written out 777 data = self.leftover + data 778 lines = data.split("\n") 779 # process every line but the last one 780 for line in lines[:-1]: 781 self._process_line(line) 782 # save the last line for later processing 783 # since we may not have the whole line yet 784 self.leftover = lines[-1] 785 786 787 def flush(self): 788 sys.stdout.flush() 789 790 791 def close(self): 792 if self.leftover: 793 self._process_line(self.leftover) 794 self._process_logs() 795 self.flush() 796 797 798SiteAutotest = client_utils.import_site_class( 799 __file__, "autotest_lib.server.site_autotest", "SiteAutotest", 800 BaseAutotest) 801 802class Autotest(SiteAutotest): 803 pass 804