server_job.py revision bfb32f8d1b5b83506f7a91dd34efcc6e1feede5d
1""" 2The main job wrapper for the server side. 3 4This is the core infrastructure. Derived from the client side job.py 5 6Copyright Martin J. Bligh, Andy Whitcroft 2007 7""" 8 9__author__ = """ 10Martin J. Bligh <mbligh@google.com> 11Andy Whitcroft <apw@shadowen.org> 12""" 13 14import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback 15 16from autotest_lib.client.bin import fd_stack 17from autotest_lib.client.common_lib import error, log 18from autotest_lib.server import test, subcommand 19from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 20from autotest_lib.client.common_lib import utils, packages 21 22 23# load up a control segment 24# these are all stored in <server_dir>/control_segments 25def load_control_segment(name): 26 server_dir = os.path.dirname(os.path.abspath(__file__)) 27 script_file = os.path.join(server_dir, "control_segments", name) 28 if os.path.exists(script_file): 29 return file(script_file).read() 30 else: 31 return "" 32 33 34preamble = """\ 35import os, sys 36 37from autotest_lib.server import hosts, autotest, kvm, git, standalone_profiler 38from autotest_lib.server import source_kernel, rpm_kernel, deb_kernel 39from autotest_lib.server import git_kernel 40from autotest_lib.server.subcommand import * 41from autotest_lib.server.utils import run, get_tmp_dir, sh_escape 42from autotest_lib.server.utils import parse_machine 43from autotest_lib.client.common_lib.error import * 44from autotest_lib.client.common_lib import barrier 45 46autotest.Autotest.job = job 47hosts.Host.job = job 48barrier = barrier.barrier 49if len(machines) > 1: 50 open('.machines', 'w').write('\\n'.join(machines) + '\\n') 51""" 52 53client_wrapper = """ 54at = autotest.Autotest() 55 56def run_client(machine): 57 hostname, user, passwd, port = parse_machine( 58 machine, ssh_user, ssh_port, ssh_pass) 59 60 host = hosts.create_host(hostname, user=user, port=port, password=passwd) 61 host.log_kernel() 62 at.run(control, host=host) 63 64job.parallel_simple(run_client, machines) 65""" 66 67crashdumps = """ 68def crashdumps(machine): 69 hostname, user, passwd, port = parse_machine(machine, ssh_user, 70 ssh_port, ssh_pass) 71 72 host = hosts.create_host(hostname, user=user, port=port, 73 initialize=False, password=passwd) 74 host.get_crashdumps(test_start_time) 75 76job.parallel_simple(crashdumps, machines, log=False) 77""" 78 79 80crashinfo = """ 81def crashinfo(machine): 82 hostname, user, passwd, port = parse_machine(machine, ssh_user, 83 ssh_port, ssh_pass) 84 85 host = hosts.create_host(hostname, user=user, port=port, 86 initialize=False, password=passwd) 87 host.get_crashinfo(test_start_time) 88 89job.parallel_simple(crashinfo, machines, log=False) 90""" 91 92 93reboot_segment="""\ 94def reboot(machine): 95 hostname, user, passwd, port = parse_machine(machine, ssh_user, 96 ssh_port, ssh_pass) 97 98 host = hosts.create_host(hostname, user=user, port=port, 99 initialize=False, password=passwd) 100 host.reboot() 101 102job.parallel_simple(reboot, machines, log=False) 103""" 104 105install="""\ 106def install(machine): 107 hostname, user, passwd, port = parse_machine(machine, ssh_user, 108 ssh_port, ssh_pass) 109 110 host = hosts.create_host(hostname, user=user, port=port, 111 initialize=False, password=passwd) 112 host.machine_install() 113 114job.parallel_simple(install, machines, log=False) 115""" 116 117# load up the verifier control segment, with an optional site-specific hook 118verify = load_control_segment("site_verify") 119verify += load_control_segment("verify") 120 121# load up the repair control segment, with an optional site-specific hook 122repair = load_control_segment("site_repair") 123repair += load_control_segment("repair") 124 125 126# load up site-specific code for generating site-specific job data 127try: 128 import site_job 129 get_site_job_data = site_job.get_site_job_data 130 del site_job 131except ImportError: 132 # by default provide a stub that generates no site data 133 def get_site_job_data(job): 134 return {} 135 136 137class base_server_job(object): 138 """The actual job against which we do everything. 139 140 Properties: 141 autodir 142 The top level autotest directory (/usr/local/autotest). 143 serverdir 144 <autodir>/server/ 145 clientdir 146 <autodir>/client/ 147 conmuxdir 148 <autodir>/conmux/ 149 testdir 150 <autodir>/server/tests/ 151 site_testdir 152 <autodir>/server/site_tests/ 153 control 154 the control file for this job 155 """ 156 157 STATUS_VERSION = 1 158 159 160 def __init__(self, control, args, resultdir, label, user, machines, 161 client=False, parse_job='', 162 ssh_user='root', ssh_port=22, ssh_pass=''): 163 """ 164 control 165 The control file (pathname of) 166 args 167 args to pass to the control file 168 resultdir 169 where to throw the results 170 label 171 label for the job 172 user 173 Username for the job (email address) 174 client 175 True if a client-side control file 176 """ 177 path = os.path.dirname(__file__) 178 self.autodir = os.path.abspath(os.path.join(path, '..')) 179 self.serverdir = os.path.join(self.autodir, 'server') 180 self.testdir = os.path.join(self.serverdir, 'tests') 181 self.site_testdir = os.path.join(self.serverdir, 'site_tests') 182 self.tmpdir = os.path.join(self.serverdir, 'tmp') 183 self.conmuxdir = os.path.join(self.autodir, 'conmux') 184 self.clientdir = os.path.join(self.autodir, 'client') 185 self.toolsdir = os.path.join(self.autodir, 'client/tools') 186 if control: 187 self.control = open(control, 'r').read() 188 self.control = re.sub('\r', '', self.control) 189 else: 190 self.control = None 191 self.resultdir = resultdir 192 if not os.path.exists(resultdir): 193 os.mkdir(resultdir) 194 self.debugdir = os.path.join(resultdir, 'debug') 195 if not os.path.exists(self.debugdir): 196 os.mkdir(self.debugdir) 197 self.status = os.path.join(resultdir, 'status') 198 self.label = label 199 self.user = user 200 self.args = args 201 self.machines = machines 202 self.client = client 203 self.record_prefix = '' 204 self.warning_loggers = set() 205 self.ssh_user = ssh_user 206 self.ssh_port = ssh_port 207 self.ssh_pass = ssh_pass 208 self.run_test_cleanup = True 209 210 self.stdout = fd_stack.fd_stack(1, sys.stdout) 211 self.stderr = fd_stack.fd_stack(2, sys.stderr) 212 213 if not os.access(self.tmpdir, os.W_OK): 214 try: 215 os.makedirs(self.tmpdir, 0700) 216 except os.error, e: 217 # Thrown if the directory already exists, which it may. 218 pass 219 220 if (not os.access(self.tmpdir, os.W_OK) or 221 not os.path.isdir(self.tmpdir)): 222 self.tmpdir = os.path.join(tempfile.gettempdir(), 223 'autotest-' + getpass.getuser()) 224 try: 225 os.makedirs(self.tmpdir, 0700) 226 except os.error, e: 227 # Thrown if the directory already exists, which it may. 228 # If the problem was something other than the 229 # directory already existing, this chmod should throw as well 230 # exception. 231 os.chmod(self.tmpdir, stat.S_IRWXU) 232 233 if os.path.exists(self.status): 234 os.unlink(self.status) 235 job_data = {'label' : label, 'user' : user, 236 'hostname' : ','.join(machines), 237 'status_version' : str(self.STATUS_VERSION)} 238 job_data.update(get_site_job_data(self)) 239 utils.write_keyval(self.resultdir, job_data) 240 241 self.parse_job = parse_job 242 if self.parse_job and len(machines) == 1: 243 self.using_parser = True 244 self.init_parser(resultdir) 245 else: 246 self.using_parser = False 247 self.pkgmgr = packages.PackageManager( 248 self.autodir, run_function_dargs={'timeout':600}) 249 self.pkgdir = os.path.join(self.autodir, 'packages') 250 251 252 def init_parser(self, resultdir): 253 """Start the continuous parsing of resultdir. This sets up 254 the database connection and inserts the basic job object into 255 the database if necessary.""" 256 # redirect parser debugging to .parse.log 257 parse_log = os.path.join(resultdir, '.parse.log') 258 parse_log = open(parse_log, 'w', 0) 259 tko_utils.redirect_parser_debugging(parse_log) 260 # create a job model object and set up the db 261 self.results_db = tko_db.db(autocommit=True) 262 self.parser = status_lib.parser(self.STATUS_VERSION) 263 self.job_model = self.parser.make_job(resultdir) 264 self.parser.start(self.job_model) 265 # check if a job already exists in the db and insert it if 266 # it does not 267 job_idx = self.results_db.find_job(self.parse_job) 268 if job_idx is None: 269 self.results_db.insert_job(self.parse_job, 270 self.job_model) 271 else: 272 machine_idx = self.results_db.lookup_machine( 273 self.job_model.machine) 274 self.job_model.index = job_idx 275 self.job_model.machine_idx = machine_idx 276 277 278 def cleanup_parser(self): 279 """This should be called after the server job is finished 280 to carry out any remaining cleanup (e.g. flushing any 281 remaining test results to the results db)""" 282 if not self.using_parser: 283 return 284 final_tests = self.parser.end() 285 for test in final_tests: 286 self.__insert_test(test) 287 self.using_parser = False 288 289 290 def verify(self): 291 if not self.machines: 292 raise error.AutoservError( 293 'No machines specified to verify') 294 try: 295 namespace = {'machines' : self.machines, 'job' : self, 296 'ssh_user' : self.ssh_user, 297 'ssh_port' : self.ssh_port, 298 'ssh_pass' : self.ssh_pass} 299 self._execute_code(preamble + verify, namespace) 300 except Exception, e: 301 msg = ('Verify failed\n' + str(e) + '\n' 302 + traceback.format_exc()) 303 self.record('ABORT', None, None, msg) 304 raise 305 306 307 def repair(self, host_protection): 308 if not self.machines: 309 raise error.AutoservError('No machines specified to repair') 310 namespace = {'machines': self.machines, 'job': self, 311 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port, 312 'ssh_pass': self.ssh_pass, 313 'protection_level': host_protection} 314 # no matter what happens during repair, go on to try to reverify 315 try: 316 self._execute_code(preamble + repair, namespace) 317 except Exception, exc: 318 print 'Exception occured during repair' 319 traceback.print_exc() 320 self.verify() 321 322 323 def precheck(self): 324 """ 325 perform any additional checks in derived classes. 326 """ 327 pass 328 329 330 def enable_external_logging(self): 331 """Start or restart external logging mechanism. 332 """ 333 pass 334 335 336 def disable_external_logging(self): 337 """ Pause or stop external logging mechanism. 338 """ 339 pass 340 341 342 def enable_test_cleanup(self): 343 """ By default tests run test.cleanup """ 344 self.run_test_cleanup = True 345 346 347 def disable_test_cleanup(self): 348 """ By default tests do not run test.cleanup """ 349 self.run_test_cleanup = False 350 351 352 def use_external_logging(self): 353 """Return True if external logging should be used. 354 """ 355 return False 356 357 358 def parallel_simple(self, function, machines, log=True, timeout=None): 359 """Run 'function' using parallel_simple, with an extra 360 wrapper to handle the necessary setup for continuous parsing, 361 if possible. If continuous parsing is already properly 362 initialized then this should just work.""" 363 is_forking = not (len(machines) == 1 and 364 self.machines == machines) 365 if self.parse_job and is_forking and log: 366 def wrapper(machine): 367 self.parse_job += "/" + machine 368 self.using_parser = True 369 self.machines = [machine] 370 self.resultdir = os.path.join(self.resultdir, 371 machine) 372 os.chdir(self.resultdir) 373 self.init_parser(self.resultdir) 374 result = function(machine) 375 self.cleanup_parser() 376 return result 377 elif len(machines) > 1 and log: 378 def wrapper(machine): 379 self.resultdir = os.path.join(self.resultdir, machine) 380 os.chdir(self.resultdir) 381 result = function(machine) 382 return result 383 else: 384 wrapper = function 385 subcommand.parallel_simple(wrapper, machines, log, timeout) 386 387 388 def run(self, reboot = False, install_before = False, 389 install_after = False, collect_crashdumps = True, 390 namespace = {}): 391 # use a copy so changes don't affect the original dictionary 392 namespace = namespace.copy() 393 machines = self.machines 394 395 self.aborted = False 396 namespace['machines'] = machines 397 namespace['args'] = self.args 398 namespace['job'] = self 399 namespace['ssh_user'] = self.ssh_user 400 namespace['ssh_port'] = self.ssh_port 401 namespace['ssh_pass'] = self.ssh_pass 402 test_start_time = int(time.time()) 403 404 os.chdir(self.resultdir) 405 406 self.enable_external_logging() 407 status_log = os.path.join(self.resultdir, 'status.log') 408 collect_crashinfo = True 409 try: 410 if install_before and machines: 411 self._execute_code(preamble + install, namespace) 412 if self.client: 413 namespace['control'] = self.control 414 open('control', 'w').write(self.control) 415 open('control.srv', 'w').write(client_wrapper) 416 server_control = client_wrapper 417 else: 418 open('control.srv', 'w').write(self.control) 419 server_control = self.control 420 self._execute_code(preamble + server_control, namespace) 421 422 # disable crashinfo collection if we get this far without error 423 collect_crashinfo = False 424 finally: 425 if machines and (collect_crashdumps or collect_crashinfo): 426 namespace['test_start_time'] = test_start_time 427 if collect_crashinfo: 428 script = crashinfo # includes crashdumps 429 else: 430 script = crashdumps 431 self._execute_code(preamble + script, namespace) 432 self.disable_external_logging() 433 if reboot and machines: 434 self._execute_code(preamble + reboot_segment, namespace) 435 if install_after and machines: 436 self._execute_code(preamble + install, namespace) 437 438 439 def run_test(self, url, *args, **dargs): 440 """Summon a test object and run it. 441 442 tag 443 tag to add to testname 444 url 445 url of the test to run 446 """ 447 448 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 449 450 tag = dargs.pop('tag', None) 451 if tag: 452 testname += '.' + tag 453 subdir = testname 454 455 outputdir = os.path.join(self.resultdir, subdir) 456 if os.path.exists(outputdir): 457 msg = ("%s already exists, test <%s> may have" 458 " already run with tag <%s>" 459 % (outputdir, testname, tag) ) 460 raise error.TestError(msg) 461 os.mkdir(outputdir) 462 463 def group_func(): 464 try: 465 test.runtest(self, url, tag, args, dargs) 466 except error.TestBaseException, e: 467 self.record(e.exit_status, subdir, testname, str(e)) 468 raise 469 except Exception, e: 470 info = str(e) + "\n" + traceback.format_exc() 471 self.record('FAIL', subdir, testname, info) 472 raise 473 else: 474 self.record('GOOD', subdir, testname, 475 'completed successfully') 476 477 result, exc_info = self._run_group(testname, subdir, group_func) 478 if exc_info and isinstance(exc_info[1], error.TestBaseException): 479 return False 480 elif exc_info: 481 raise exc_info[0], exc_info[1], exc_info[2] 482 else: 483 return True 484 485 486 def _run_group(self, name, subdir, function, *args, **dargs): 487 """\ 488 Underlying method for running something inside of a group. 489 """ 490 result, exc_info = None, None 491 old_record_prefix = self.record_prefix 492 try: 493 self.record('START', subdir, name) 494 self.record_prefix += '\t' 495 try: 496 result = function(*args, **dargs) 497 finally: 498 self.record_prefix = old_record_prefix 499 except error.TestBaseException, e: 500 self.record("END %s" % e.exit_status, subdir, name, str(e)) 501 exc_info = sys.exc_info() 502 except Exception, e: 503 err_msg = str(e) + '\n' 504 err_msg += traceback.format_exc() 505 self.record('END ABORT', subdir, name, err_msg) 506 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 507 else: 508 self.record('END GOOD', subdir, name) 509 510 return result, exc_info 511 512 513 def run_group(self, function, *args, **dargs): 514 """\ 515 function: 516 subroutine to run 517 *args: 518 arguments for the function 519 """ 520 521 name = function.__name__ 522 523 # Allow the tag for the group to be specified. 524 tag = dargs.pop('tag', None) 525 if tag: 526 name = tag 527 528 return self._run_group(name, None, function, *args, **dargs)[0] 529 530 531 def run_reboot(self, reboot_func, get_kernel_func): 532 """\ 533 A specialization of run_group meant specifically for handling 534 a reboot. Includes support for capturing the kernel version 535 after the reboot. 536 537 reboot_func: a function that carries out the reboot 538 539 get_kernel_func: a function that returns a string 540 representing the kernel version. 541 """ 542 543 old_record_prefix = self.record_prefix 544 try: 545 self.record('START', None, 'reboot') 546 self.record_prefix += '\t' 547 reboot_func() 548 except Exception, e: 549 self.record_prefix = old_record_prefix 550 err_msg = str(e) + '\n' + traceback.format_exc() 551 self.record('END FAIL', None, 'reboot', err_msg) 552 else: 553 kernel = get_kernel_func() 554 self.record_prefix = old_record_prefix 555 self.record('END GOOD', None, 'reboot', 556 optional_fields={"kernel": kernel}) 557 558 559 def record(self, status_code, subdir, operation, status='', 560 optional_fields=None): 561 """ 562 Record job-level status 563 564 The intent is to make this file both machine parseable and 565 human readable. That involves a little more complexity, but 566 really isn't all that bad ;-) 567 568 Format is <status code>\t<subdir>\t<operation>\t<status> 569 570 status code: see common_lib.log.is_valid_status() 571 for valid status definition 572 573 subdir: MUST be a relevant subdirectory in the results, 574 or None, which will be represented as '----' 575 576 operation: description of what you ran (e.g. "dbench", or 577 "mkfs -t foobar /dev/sda9") 578 579 status: error message or "completed sucessfully" 580 581 ------------------------------------------------------------ 582 583 Initial tabs indicate indent levels for grouping, and is 584 governed by self.record_prefix 585 586 multiline messages have secondary lines prefaced by a double 587 space (' ') 588 589 Executing this method will trigger the logging of all new 590 warnings to date from the various console loggers. 591 """ 592 # poll all our warning loggers for new warnings 593 warnings = self._read_warnings() 594 for timestamp, msg in warnings: 595 self._record("WARN", None, None, msg, timestamp) 596 597 # write out the actual status log line 598 self._record(status_code, subdir, operation, status, 599 optional_fields=optional_fields) 600 601 602 def _read_warnings(self): 603 warnings = [] 604 while True: 605 # pull in a line of output from every logger that has 606 # output ready to be read 607 loggers, _, _ = select.select(self.warning_loggers, 608 [], [], 0) 609 closed_loggers = set() 610 for logger in loggers: 611 line = logger.readline() 612 # record any broken pipes (aka line == empty) 613 if len(line) == 0: 614 closed_loggers.add(logger) 615 continue 616 timestamp, msg = line.split('\t', 1) 617 warnings.append((int(timestamp), msg.strip())) 618 619 # stop listening to loggers that are closed 620 self.warning_loggers -= closed_loggers 621 622 # stop if none of the loggers have any output left 623 if not loggers: 624 break 625 626 # sort into timestamp order 627 warnings.sort() 628 return warnings 629 630 631 def _render_record(self, status_code, subdir, operation, status='', 632 epoch_time=None, record_prefix=None, 633 optional_fields=None): 634 """ 635 Internal Function to generate a record to be written into a 636 status log. For use by server_job.* classes only. 637 """ 638 if subdir: 639 if re.match(r'[\n\t]', subdir): 640 raise ValueError( 641 'Invalid character in subdir string') 642 substr = subdir 643 else: 644 substr = '----' 645 646 if not log.is_valid_status(status_code): 647 raise ValueError('Invalid status code supplied: %s' % 648 status_code) 649 if not operation: 650 operation = '----' 651 if re.match(r'[\n\t]', operation): 652 raise ValueError( 653 'Invalid character in operation string') 654 operation = operation.rstrip() 655 status = status.rstrip() 656 status = re.sub(r"\t", " ", status) 657 # Ensure any continuation lines are marked so we can 658 # detect them in the status file to ensure it is parsable. 659 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 660 661 if not optional_fields: 662 optional_fields = {} 663 664 # Generate timestamps for inclusion in the logs 665 if epoch_time is None: 666 epoch_time = int(time.time()) 667 local_time = time.localtime(epoch_time) 668 optional_fields["timestamp"] = str(epoch_time) 669 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 670 local_time) 671 672 fields = [status_code, substr, operation] 673 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 674 fields.append(status) 675 676 if record_prefix is None: 677 record_prefix = self.record_prefix 678 679 msg = '\t'.join(str(x) for x in fields) 680 681 return record_prefix + msg + '\n' 682 683 684 def _record_prerendered(self, msg): 685 """ 686 Record a pre-rendered msg into the status logs. The only 687 change this makes to the message is to add on the local 688 indentation. Should not be called outside of server_job.* 689 classes. Unlike _record, this does not write the message 690 to standard output. 691 """ 692 lines = [] 693 status_file = os.path.join(self.resultdir, 'status.log') 694 status_log = open(status_file, 'a') 695 for line in msg.splitlines(): 696 line = self.record_prefix + line + '\n' 697 lines.append(line) 698 status_log.write(line) 699 status_log.close() 700 self.__parse_status(lines) 701 702 703 def _execute_code(self, code, scope): 704 exec(code, scope, scope) 705 706 707 def _record(self, status_code, subdir, operation, status='', 708 epoch_time=None, optional_fields=None): 709 """ 710 Actual function for recording a single line into the status 711 logs. Should never be called directly, only by job.record as 712 this would bypass the console monitor logging. 713 """ 714 715 msg = self._render_record(status_code, subdir, operation, 716 status, epoch_time, 717 optional_fields=optional_fields) 718 719 720 status_file = os.path.join(self.resultdir, 'status.log') 721 sys.stdout.write(msg) 722 open(status_file, "a").write(msg) 723 if subdir: 724 test_dir = os.path.join(self.resultdir, subdir) 725 status_file = os.path.join(test_dir, 'status.log') 726 open(status_file, "a").write(msg) 727 self.__parse_status(msg.splitlines()) 728 729 730 def __parse_status(self, new_lines): 731 if not self.using_parser: 732 return 733 new_tests = self.parser.process_lines(new_lines) 734 for test in new_tests: 735 self.__insert_test(test) 736 737 738 def __insert_test(self, test): 739 """ An internal method to insert a new test result into the 740 database. This method will not raise an exception, even if an 741 error occurs during the insert, to avoid failing a test 742 simply because of unexpected database issues.""" 743 try: 744 self.results_db.insert_test(self.job_model, test) 745 except Exception: 746 msg = ("WARNING: An unexpected error occured while " 747 "inserting test results into the database. " 748 "Ignoring error.\n" + traceback.format_exc()) 749 print >> sys.stderr, msg 750 751 752 753class log_collector(object): 754 def __init__(self, host, client_tag, results_dir): 755 self.host = host 756 if not client_tag: 757 client_tag = "default" 758 self.client_results_dir = os.path.join(host.get_autodir(), "results", 759 client_tag) 760 self.server_results_dir = results_dir 761 762 763 def collect_client_job_results(self): 764 """ A method that collects all the current results of a running 765 client job into the results dir. By default does nothing as no 766 client job is running, but when running a client job you can override 767 this with something that will actually do something. """ 768 769 # make an effort to wait for the machine to come up 770 try: 771 self.host.wait_up(timeout=30) 772 except error.AutoservError: 773 # don't worry about any errors, we'll try and 774 # get the results anyway 775 pass 776 777 778 # Copy all dirs in default to results_dir 779 try: 780 keyval_path = self._prepare_for_copying_logs() 781 self.host.get_file(self.client_results_dir + '/', 782 self.server_results_dir) 783 self._process_copied_logs(keyval_path) 784 self._postprocess_copied_logs() 785 except Exception: 786 # well, don't stop running just because we couldn't get logs 787 print "Unexpected error copying test result logs, continuing ..." 788 traceback.print_exc(file=sys.stdout) 789 790 791 def _prepare_for_copying_logs(self): 792 server_keyval = os.path.join(self.server_results_dir, 'keyval') 793 if not os.path.exists(server_keyval): 794 # Client-side keyval file can be copied directly 795 return 796 797 # Copy client-side keyval to temporary location 798 suffix = '.keyval_%s' % self.host.hostname 799 fd, keyval_path = tempfile.mkstemp(suffix) 800 os.close(fd) 801 try: 802 client_keyval = os.path.join(self.client_results_dir, 'keyval') 803 try: 804 self.host.get_file(client_keyval, keyval_path) 805 finally: 806 # We will squirrel away the client side keyval 807 # away and move it back when we are done 808 remote_temp_dir = self.host.get_tmp_dir() 809 self.temp_keyval_path = os.path.join(remote_temp_dir, "keyval") 810 self.host.run('mv %s %s' % (client_keyval, 811 self.temp_keyval_path)) 812 except (error.AutoservRunError, error.AutoservSSHTimeout): 813 print "Prepare for copying logs failed" 814 return keyval_path 815 816 817 def _process_copied_logs(self, keyval_path): 818 if not keyval_path: 819 # Client-side keyval file was copied directly 820 return 821 822 # Append contents of keyval_<host> file to keyval file 823 try: 824 # Read in new and old keyval files 825 new_keyval = utils.read_keyval(keyval_path) 826 old_keyval = utils.read_keyval(self.server_results_dir) 827 # 'Delete' from new keyval entries that are in both 828 tmp_keyval = {} 829 for key, val in new_keyval.iteritems(): 830 if key not in old_keyval: 831 tmp_keyval[key] = val 832 # Append new info to keyval file 833 utils.write_keyval(self.server_results_dir, tmp_keyval) 834 # Delete keyval_<host> file 835 os.remove(keyval_path) 836 except IOError: 837 print "Process copied logs failed" 838 839 840 def _postprocess_copied_logs(self): 841 # we can now put our keyval file back 842 client_keyval = os.path.join(self.client_results_dir, 'keyval') 843 try: 844 self.host.run('mv %s %s' % (self.temp_keyval_path, client_keyval)) 845 except Exception: 846 pass 847 848 849# a file-like object for catching stderr from an autotest client and 850# extracting status logs from it 851class client_logger(object): 852 """Partial file object to write to both stdout and 853 the status log file. We only implement those methods 854 utils.run() actually calls. 855 """ 856 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$") 857 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$") 858 extract_indent = re.compile(r"^(\t*).*$") 859 860 def __init__(self, host, tag, server_results_dir): 861 self.host = host 862 self.job = host.job 863 self.log_collector = log_collector(host, tag, server_results_dir) 864 self.leftover = "" 865 self.last_line = "" 866 self.logs = {} 867 868 869 def _process_log_dict(self, log_dict): 870 log_list = log_dict.pop("logs", []) 871 for key in sorted(log_dict.iterkeys()): 872 log_list += self._process_log_dict(log_dict.pop(key)) 873 return log_list 874 875 876 def _process_logs(self): 877 """Go through the accumulated logs in self.log and print them 878 out to stdout and the status log. Note that this processes 879 logs in an ordering where: 880 881 1) logs to different tags are never interleaved 882 2) logs to x.y come before logs to x.y.z for all z 883 3) logs to x.y come before x.z whenever y < z 884 885 Note that this will in general not be the same as the 886 chronological ordering of the logs. However, if a chronological 887 ordering is desired that one can be reconstructed from the 888 status log by looking at timestamp lines.""" 889 log_list = self._process_log_dict(self.logs) 890 for line in log_list: 891 self.job._record_prerendered(line + '\n') 892 if log_list: 893 self.last_line = log_list[-1] 894 895 896 def _process_quoted_line(self, tag, line): 897 """Process a line quoted with an AUTOTEST_STATUS flag. If the 898 tag is blank then we want to push out all the data we've been 899 building up in self.logs, and then the newest line. If the 900 tag is not blank, then push the line into the logs for handling 901 later.""" 902 print line 903 if tag == "": 904 self._process_logs() 905 self.job._record_prerendered(line + '\n') 906 self.last_line = line 907 else: 908 tag_parts = [int(x) for x in tag.split(".")] 909 log_dict = self.logs 910 for part in tag_parts: 911 log_dict = log_dict.setdefault(part, {}) 912 log_list = log_dict.setdefault("logs", []) 913 log_list.append(line) 914 915 916 def _process_line(self, line): 917 """Write out a line of data to the appropriate stream. Status 918 lines sent by autotest will be prepended with 919 "AUTOTEST_STATUS", and all other lines are ssh error 920 messages.""" 921 status_match = self.status_parser.search(line) 922 test_complete_match = self.test_complete_parser.search(line) 923 if status_match: 924 tag, line = status_match.groups() 925 self._process_quoted_line(tag, line) 926 elif test_complete_match: 927 fifo_path, = test_complete_match.groups() 928 self.log_collector.collect_client_job_results() 929 self.host.run("echo A > %s" % fifo_path) 930 else: 931 print line 932 933 934 def _format_warnings(self, last_line, warnings): 935 # use the indentation of whatever the last log line was 936 indent = self.extract_indent.match(last_line).group(1) 937 # if the last line starts a new group, add an extra indent 938 if last_line.lstrip('\t').startswith("START\t"): 939 indent += '\t' 940 return [self.job._render_record("WARN", None, None, msg, 941 timestamp, indent).rstrip('\n') 942 for timestamp, msg in warnings] 943 944 945 def _process_warnings(self, last_line, log_dict, warnings): 946 if log_dict.keys() in ([], ["logs"]): 947 # there are no sub-jobs, just append the warnings here 948 warnings = self._format_warnings(last_line, warnings) 949 log_list = log_dict.setdefault("logs", []) 950 log_list += warnings 951 for warning in warnings: 952 sys.stdout.write(warning + '\n') 953 else: 954 # there are sub-jobs, so put the warnings in there 955 log_list = log_dict.get("logs", []) 956 if log_list: 957 last_line = log_list[-1] 958 for key in sorted(log_dict.iterkeys()): 959 if key != "logs": 960 self._process_warnings(last_line, 961 log_dict[key], 962 warnings) 963 964 965 def write(self, data): 966 # first check for any new console warnings 967 warnings = self.job._read_warnings() 968 self._process_warnings(self.last_line, self.logs, warnings) 969 # now process the newest data written out 970 data = self.leftover + data 971 lines = data.split("\n") 972 # process every line but the last one 973 for line in lines[:-1]: 974 self._process_line(line) 975 # save the last line for later processing 976 # since we may not have the whole line yet 977 self.leftover = lines[-1] 978 979 980 def flush(self): 981 sys.stdout.flush() 982 983 984 def close(self): 985 if self.leftover: 986 self._process_line(self.leftover) 987 self._process_logs() 988 self.flush() 989 990 991# site_server_job.py may be non-existant or empty, make sure that an 992# appropriate site_server_job class is created nevertheless 993try: 994 from autotest_lib.server.site_server_job import site_server_job 995except ImportError: 996 class site_server_job(object): 997 pass 998 999class server_job(site_server_job, base_server_job): 1000 pass 1001