server_job.py revision cf8d492a6eafdd03d88ad9dad676ae737ec898ec
1""" 2The main job wrapper for the server side. 3 4This is the core infrastructure. Derived from the client side job.py 5 6Copyright Martin J. Bligh, Andy Whitcroft 2007 7""" 8 9import getpass, os, sys, re, stat, tempfile, time, select, subprocess 10import traceback, shutil, warnings, fcntl, pickle, logging 11import itertools 12from autotest_lib.client.bin import sysinfo 13from autotest_lib.client.common_lib import error, log, utils, packages 14from autotest_lib.client.common_lib import logging_manager 15from autotest_lib.server import test, subcommand, profilers 16from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 17 18 19def _control_segment_path(name): 20 """Get the pathname of the named control segment file.""" 21 server_dir = os.path.dirname(os.path.abspath(__file__)) 22 return os.path.join(server_dir, "control_segments", name) 23 24 25CLIENT_CONTROL_FILENAME = 'control' 26SERVER_CONTROL_FILENAME = 'control.srv' 27MACHINES_FILENAME = '.machines' 28 29CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 30CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 31CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 32INSTALL_CONTROL_FILE = _control_segment_path('install') 33CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 34 35VERIFY_CONTROL_FILE = _control_segment_path('verify') 36REPAIR_CONTROL_FILE = _control_segment_path('repair') 37 38 39# by default provide a stub that generates no site data 40def _get_site_job_data_dummy(job): 41 return {} 42 43 44# load up site-specific code for generating site-specific job data 45get_site_job_data = utils.import_site_function(__file__, 46 "autotest_lib.server.site_server_job", "get_site_job_data", 47 _get_site_job_data_dummy) 48 49 50class base_server_job(object): 51 """ 52 The actual job against which we do everything. 53 54 Properties: 55 autodir 56 The top level autotest directory (/usr/local/autotest). 57 serverdir 58 <autodir>/server/ 59 clientdir 60 <autodir>/client/ 61 conmuxdir 62 <autodir>/conmux/ 63 testdir 64 <autodir>/server/tests/ 65 site_testdir 66 <autodir>/server/site_tests/ 67 control 68 the control file for this job 69 drop_caches_between_iterations 70 drop the pagecache between each iteration 71 default_profile_only 72 default value for the test.execute profile_only parameter 73 """ 74 75 STATUS_VERSION = 1 76 77 def __init__(self, control, args, resultdir, label, user, machines, 78 client=False, parse_job='', 79 ssh_user='root', ssh_port=22, ssh_pass='', 80 group_name='', tag=''): 81 """ 82 Create a server side job object. 83 84 @param control: The pathname of the control file. 85 @param args: Passed to the control file. 86 @param resultdir: Where to throw the results. 87 @param label: Description of the job. 88 @param user: Username for the job (email address). 89 @param client: True if this is a client-side control file. 90 @param parse_job: string, if supplied it is the job execution tag that 91 the results will be passed through to the TKO parser with. 92 @param ssh_user: The SSH username. [root] 93 @param ssh_port: The SSH port number. [22] 94 @param ssh_pass: The SSH passphrase, if needed. 95 @param group_name: If supplied, this will be written out as 96 host_group_name in the keyvals file for the parser. 97 @param tag: The job execution tag from the scheduler. [optional] 98 """ 99 path = os.path.dirname(__file__) 100 self.autodir = os.path.abspath(os.path.join(path, '..')) 101 self.serverdir = os.path.join(self.autodir, 'server') 102 self.testdir = os.path.join(self.serverdir, 'tests') 103 self.site_testdir = os.path.join(self.serverdir, 'site_tests') 104 self.tmpdir = os.path.join(self.serverdir, 'tmp') 105 self.conmuxdir = os.path.join(self.autodir, 'conmux') 106 self.clientdir = os.path.join(self.autodir, 'client') 107 self.toolsdir = os.path.join(self.autodir, 'client/tools') 108 if control: 109 self.control = self._load_control_file(control) 110 else: 111 self.control = '' 112 self.resultdir = resultdir 113 self.uncollected_log_file = None 114 if resultdir: 115 self.uncollected_log_file = os.path.join(resultdir, 116 'uncollected_logs') 117 self.debugdir = os.path.join(resultdir, 'debug') 118 119 if not os.path.exists(resultdir): 120 os.mkdir(resultdir) 121 if not os.path.exists(self.debugdir): 122 os.mkdir(self.debugdir) 123 self.label = label 124 self.user = user 125 self.args = args 126 self.machines = machines 127 self.client = client 128 self.record_prefix = '' 129 self.warning_loggers = set() 130 self.warning_manager = warning_manager() 131 self.ssh_user = ssh_user 132 self.ssh_port = ssh_port 133 self.ssh_pass = ssh_pass 134 self.tag = tag 135 self.default_profile_only = False 136 self.run_test_cleanup = True 137 self.last_boot_tag = None 138 self.hosts = set() 139 self.drop_caches_between_iterations = False 140 141 self.logging = logging_manager.get_logging_manager( 142 manage_stdout_and_stderr=True, redirect_fds=True) 143 subcommand.logging_manager_object = self.logging 144 145 if resultdir: 146 self.sysinfo = sysinfo.sysinfo(self.resultdir) 147 self.profilers = profilers.profilers(self) 148 149 if not os.access(self.tmpdir, os.W_OK): 150 try: 151 os.makedirs(self.tmpdir, 0700) 152 except os.error, e: 153 # Thrown if the directory already exists, which it may. 154 pass 155 156 if not (os.access(self.tmpdir, os.W_OK) and os.path.isdir(self.tmpdir)): 157 self.tmpdir = os.path.join(tempfile.gettempdir(), 158 'autotest-' + getpass.getuser()) 159 try: 160 os.makedirs(self.tmpdir, 0700) 161 except os.error, e: 162 # Thrown if the directory already exists, which it may. 163 # If the problem was something other than the 164 # directory already existing, this chmod should throw as well 165 # exception. 166 os.chmod(self.tmpdir, stat.S_IRWXU) 167 168 job_data = {'label' : label, 'user' : user, 169 'hostname' : ','.join(machines), 170 'status_version' : str(self.STATUS_VERSION), 171 'job_started' : str(int(time.time()))} 172 if group_name: 173 job_data['host_group_name'] = group_name 174 if self.resultdir: 175 # only write these keyvals out on the first job in a resultdir 176 if 'job_started' not in utils.read_keyval(self.resultdir): 177 job_data.update(get_site_job_data(self)) 178 utils.write_keyval(self.resultdir, job_data) 179 180 self.parse_job = parse_job 181 if self.parse_job and len(machines) == 1: 182 self.using_parser = True 183 self.init_parser(resultdir) 184 else: 185 self.using_parser = False 186 self.pkgmgr = packages.PackageManager(self.autodir, 187 run_function_dargs={'timeout':600}) 188 self.pkgdir = os.path.join(self.autodir, 'packages') 189 190 self.num_tests_run = 0 191 self.num_tests_failed = 0 192 193 self._register_subcommand_hooks() 194 self._test_tag_prefix = None 195 196 197 @staticmethod 198 def _load_control_file(path): 199 f = open(path) 200 try: 201 control_file = f.read() 202 finally: 203 f.close() 204 return re.sub('\r', '', control_file) 205 206 207 def _register_subcommand_hooks(self): 208 """ 209 Register some hooks into the subcommand modules that allow us 210 to properly clean up self.hosts created in forked subprocesses. 211 """ 212 def on_fork(cmd): 213 self._existing_hosts_on_fork = set(self.hosts) 214 def on_join(cmd): 215 new_hosts = self.hosts - self._existing_hosts_on_fork 216 for host in new_hosts: 217 host.close() 218 subcommand.subcommand.register_fork_hook(on_fork) 219 subcommand.subcommand.register_join_hook(on_join) 220 221 222 def init_parser(self, resultdir): 223 """ 224 Start the continuous parsing of resultdir. This sets up 225 the database connection and inserts the basic job object into 226 the database if necessary. 227 """ 228 # redirect parser debugging to .parse.log 229 parse_log = os.path.join(resultdir, '.parse.log') 230 parse_log = open(parse_log, 'w', 0) 231 tko_utils.redirect_parser_debugging(parse_log) 232 # create a job model object and set up the db 233 self.results_db = tko_db.db(autocommit=True) 234 self.parser = status_lib.parser(self.STATUS_VERSION) 235 self.job_model = self.parser.make_job(resultdir) 236 self.parser.start(self.job_model) 237 # check if a job already exists in the db and insert it if 238 # it does not 239 job_idx = self.results_db.find_job(self.parse_job) 240 if job_idx is None: 241 self.results_db.insert_job(self.parse_job, self.job_model) 242 else: 243 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 244 self.job_model.index = job_idx 245 self.job_model.machine_idx = machine_idx 246 247 248 def cleanup_parser(self): 249 """ 250 This should be called after the server job is finished 251 to carry out any remaining cleanup (e.g. flushing any 252 remaining test results to the results db) 253 """ 254 if not self.using_parser: 255 return 256 final_tests = self.parser.end() 257 for test in final_tests: 258 self.__insert_test(test) 259 self.using_parser = False 260 261 262 def verify(self): 263 if not self.machines: 264 raise error.AutoservError('No machines specified to verify') 265 if self.resultdir: 266 os.chdir(self.resultdir) 267 try: 268 namespace = {'machines' : self.machines, 'job' : self, 269 'ssh_user' : self.ssh_user, 270 'ssh_port' : self.ssh_port, 271 'ssh_pass' : self.ssh_pass} 272 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 273 except Exception, e: 274 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc()) 275 self.record('ABORT', None, None, msg) 276 raise 277 278 279 def repair(self, host_protection): 280 if not self.machines: 281 raise error.AutoservError('No machines specified to repair') 282 if self.resultdir: 283 os.chdir(self.resultdir) 284 namespace = {'machines': self.machines, 'job': self, 285 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port, 286 'ssh_pass': self.ssh_pass, 287 'protection_level': host_protection} 288 289 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 290 291 292 def precheck(self): 293 """ 294 perform any additional checks in derived classes. 295 """ 296 pass 297 298 299 def enable_external_logging(self): 300 """ 301 Start or restart external logging mechanism. 302 """ 303 pass 304 305 306 def disable_external_logging(self): 307 """ 308 Pause or stop external logging mechanism. 309 """ 310 pass 311 312 313 def set_default_profile_only(self, val): 314 """ Set the default_profile_only mode. """ 315 self.default_profile_only = val 316 317 318 def enable_test_cleanup(self): 319 """ 320 By default tests run test.cleanup 321 """ 322 self.run_test_cleanup = True 323 324 325 def disable_test_cleanup(self): 326 """ 327 By default tests do not run test.cleanup 328 """ 329 self.run_test_cleanup = False 330 331 332 def use_external_logging(self): 333 """ 334 Return True if external logging should be used. 335 """ 336 return False 337 338 339 def _make_parallel_wrapper(self, function, machines, log): 340 """Wrap function as appropriate for calling by parallel_simple.""" 341 is_forking = not (len(machines) == 1 and self.machines == machines) 342 if self.parse_job and is_forking and log: 343 def wrapper(machine): 344 self.parse_job += "/" + machine 345 self.using_parser = True 346 self.machines = [machine] 347 self.resultdir = os.path.join(self.resultdir, machine) 348 os.chdir(self.resultdir) 349 utils.write_keyval(self.resultdir, {"hostname": machine}) 350 self.init_parser(self.resultdir) 351 result = function(machine) 352 self.cleanup_parser() 353 return result 354 elif len(machines) > 1 and log: 355 def wrapper(machine): 356 self.resultdir = os.path.join(self.resultdir, machine) 357 os.chdir(self.resultdir) 358 machine_data = {'hostname' : machine, 359 'status_version' : str(self.STATUS_VERSION)} 360 utils.write_keyval(self.resultdir, machine_data) 361 result = function(machine) 362 return result 363 else: 364 wrapper = function 365 return wrapper 366 367 368 def parallel_simple(self, function, machines, log=True, timeout=None, 369 return_results=False): 370 """ 371 Run 'function' using parallel_simple, with an extra wrapper to handle 372 the necessary setup for continuous parsing, if possible. If continuous 373 parsing is already properly initialized then this should just work. 374 375 @param function: A callable to run in parallel given each machine. 376 @param machines: A list of machine names to be passed one per subcommand 377 invocation of function. 378 @param log: If True, output will be written to output in a subdirectory 379 named after each machine. 380 @param timeout: Seconds after which the function call should timeout. 381 @param return_results: If True instead of an AutoServError being raised 382 on any error a list of the results|exceptions from the function 383 called on each arg is returned. [default: False] 384 385 @raises error.AutotestError: If any of the functions failed. 386 """ 387 wrapper = self._make_parallel_wrapper(function, machines, log) 388 return subcommand.parallel_simple(wrapper, machines, 389 log=log, timeout=timeout, 390 return_results=return_results) 391 392 393 def parallel_on_machines(self, function, machines, timeout=None): 394 """ 395 @param function: Called in parallel with one machine as its argument. 396 @param machines: A list of machines to call function(machine) on. 397 @param timeout: Seconds after which the function call should timeout. 398 399 @returns A list of machines on which function(machine) returned 400 without raising an exception. 401 """ 402 results = self.parallel_simple(function, machines, timeout=timeout, 403 return_results=True) 404 success_machines = [] 405 for result, machine in itertools.izip(results, machines): 406 if not isinstance(result, Exception): 407 success_machines.append(machine) 408 return success_machines 409 410 411 USE_TEMP_DIR = object() 412 def run(self, cleanup=False, install_before=False, install_after=False, 413 collect_crashdumps=True, namespace={}, control=None, 414 control_file_dir=None, only_collect_crashinfo=False): 415 # for a normal job, make sure the uncollected logs file exists 416 # for a crashinfo-only run it should already exist, bail out otherwise 417 if self.resultdir and not os.path.exists(self.uncollected_log_file): 418 if only_collect_crashinfo: 419 # if this is a crashinfo-only run, and there were no existing 420 # uncollected logs, just bail out early 421 logging.info("No existing uncollected logs, " 422 "skipping crashinfo collection") 423 return 424 else: 425 log_file = open(self.uncollected_log_file, "w") 426 pickle.dump([], log_file) 427 log_file.close() 428 429 # use a copy so changes don't affect the original dictionary 430 namespace = namespace.copy() 431 machines = self.machines 432 if control is None: 433 control = self.control 434 if control_file_dir is None: 435 control_file_dir = self.resultdir 436 437 self.aborted = False 438 namespace['machines'] = machines 439 namespace['args'] = self.args 440 namespace['job'] = self 441 namespace['ssh_user'] = self.ssh_user 442 namespace['ssh_port'] = self.ssh_port 443 namespace['ssh_pass'] = self.ssh_pass 444 test_start_time = int(time.time()) 445 446 if self.resultdir: 447 os.chdir(self.resultdir) 448 # touch status.log so that the parser knows a job is running here 449 open(self.get_status_log_path(), 'a').close() 450 self.enable_external_logging() 451 452 collect_crashinfo = True 453 temp_control_file_dir = None 454 try: 455 try: 456 if install_before and machines: 457 self._execute_code(INSTALL_CONTROL_FILE, namespace) 458 459 if only_collect_crashinfo: 460 return 461 462 # determine the dir to write the control files to 463 cfd_specified = (control_file_dir 464 and control_file_dir is not self.USE_TEMP_DIR) 465 if cfd_specified: 466 temp_control_file_dir = None 467 else: 468 temp_control_file_dir = tempfile.mkdtemp( 469 suffix='temp_control_file_dir') 470 control_file_dir = temp_control_file_dir 471 server_control_file = os.path.join(control_file_dir, 472 SERVER_CONTROL_FILENAME) 473 client_control_file = os.path.join(control_file_dir, 474 CLIENT_CONTROL_FILENAME) 475 if self.client: 476 namespace['control'] = control 477 utils.open_write_close(client_control_file, control) 478 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 479 server_control_file) 480 else: 481 utils.open_write_close(server_control_file, control) 482 logging.info("Processing control file") 483 self._execute_code(server_control_file, namespace) 484 logging.info("Finished processing control file") 485 486 # no error occured, so we don't need to collect crashinfo 487 collect_crashinfo = False 488 except: 489 try: 490 logging.exception( 491 'Exception escaped control file, job aborting:') 492 except: 493 pass # don't let logging exceptions here interfere 494 raise 495 finally: 496 if temp_control_file_dir: 497 # Clean up temp directory used for copies of the control files 498 try: 499 shutil.rmtree(temp_control_file_dir) 500 except Exception, e: 501 logging.warn('Could not remove temp directory %s: %s', 502 temp_control_file_dir, e) 503 504 if machines and (collect_crashdumps or collect_crashinfo): 505 namespace['test_start_time'] = test_start_time 506 if collect_crashinfo: 507 # includes crashdumps 508 self._execute_code(CRASHINFO_CONTROL_FILE, namespace) 509 else: 510 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace) 511 if self.uncollected_log_file: 512 os.remove(self.uncollected_log_file) 513 self.disable_external_logging() 514 if cleanup and machines: 515 self._execute_code(CLEANUP_CONTROL_FILE, namespace) 516 if install_after and machines: 517 self._execute_code(INSTALL_CONTROL_FILE, namespace) 518 519 520 def set_test_tag_prefix(self, tag=''): 521 """ 522 Set tag to be prepended (separated by a '.') to test name of all 523 following run_test steps. 524 """ 525 self._test_tag_prefix = tag 526 527 528 def run_test(self, url, *args, **dargs): 529 """ 530 Summon a test object and run it. 531 532 tag 533 tag to add to testname 534 url 535 url of the test to run 536 """ 537 538 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 539 540 tag = dargs.pop('tag', None) 541 if tag is None: 542 tag = self._test_tag_prefix 543 elif self._test_tag_prefix: 544 tag = '%s.%s' % (self._test_tag_prefix, tag) 545 546 if tag: 547 testname += '.' + str(tag) 548 subdir = testname 549 550 outputdir = os.path.join(self.resultdir, subdir) 551 if os.path.exists(outputdir): 552 msg = ("%s already exists, test <%s> may have" 553 " already run with tag <%s>" % (outputdir, testname, tag)) 554 raise error.TestError(msg) 555 os.mkdir(outputdir) 556 557 def group_func(): 558 try: 559 test.runtest(self, url, tag, args, dargs) 560 except error.TestBaseException, e: 561 self.record(e.exit_status, subdir, testname, str(e)) 562 raise 563 except Exception, e: 564 info = str(e) + "\n" + traceback.format_exc() 565 self.record('FAIL', subdir, testname, info) 566 raise 567 else: 568 self.record('GOOD', subdir, testname, 'completed successfully') 569 570 result, exc_info = self._run_group(testname, subdir, group_func) 571 if exc_info and isinstance(exc_info[1], error.TestBaseException): 572 return False 573 elif exc_info: 574 raise exc_info[0], exc_info[1], exc_info[2] 575 else: 576 return True 577 578 579 def _run_group(self, name, subdir, function, *args, **dargs): 580 """\ 581 Underlying method for running something inside of a group. 582 """ 583 result, exc_info = None, None 584 old_record_prefix = self.record_prefix 585 try: 586 self.record('START', subdir, name) 587 self.record_prefix += '\t' 588 try: 589 result = function(*args, **dargs) 590 finally: 591 self.record_prefix = old_record_prefix 592 except error.TestBaseException, e: 593 self.record("END %s" % e.exit_status, subdir, name) 594 exc_info = sys.exc_info() 595 except Exception, e: 596 err_msg = str(e) + '\n' 597 err_msg += traceback.format_exc() 598 self.record('END ABORT', subdir, name, err_msg) 599 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 600 else: 601 self.record('END GOOD', subdir, name) 602 603 return result, exc_info 604 605 606 def run_group(self, function, *args, **dargs): 607 """\ 608 function: 609 subroutine to run 610 *args: 611 arguments for the function 612 """ 613 614 name = function.__name__ 615 616 # Allow the tag for the group to be specified. 617 tag = dargs.pop('tag', None) 618 if tag: 619 name = tag 620 621 return self._run_group(name, None, function, *args, **dargs)[0] 622 623 624 def run_reboot(self, reboot_func, get_kernel_func): 625 """\ 626 A specialization of run_group meant specifically for handling 627 a reboot. Includes support for capturing the kernel version 628 after the reboot. 629 630 reboot_func: a function that carries out the reboot 631 632 get_kernel_func: a function that returns a string 633 representing the kernel version. 634 """ 635 636 old_record_prefix = self.record_prefix 637 try: 638 self.record('START', None, 'reboot') 639 self.record_prefix += '\t' 640 reboot_func() 641 except Exception, e: 642 self.record_prefix = old_record_prefix 643 err_msg = str(e) + '\n' + traceback.format_exc() 644 self.record('END FAIL', None, 'reboot', err_msg) 645 raise 646 else: 647 kernel = get_kernel_func() 648 self.record_prefix = old_record_prefix 649 self.record('END GOOD', None, 'reboot', 650 optional_fields={"kernel": kernel}) 651 652 653 def run_control(self, path): 654 """Execute a control file found at path (relative to the autotest 655 path). Intended for executing a control file within a control file, 656 not for running the top-level job control file.""" 657 path = os.path.join(self.autodir, path) 658 control_file = self._load_control_file(path) 659 self.run(control=control_file, control_file_dir=self.USE_TEMP_DIR) 660 661 662 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 663 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 664 on_every_test) 665 666 667 def add_sysinfo_logfile(self, file, on_every_test=False): 668 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 669 670 671 def _add_sysinfo_loggable(self, loggable, on_every_test): 672 if on_every_test: 673 self.sysinfo.test_loggables.add(loggable) 674 else: 675 self.sysinfo.boot_loggables.add(loggable) 676 677 678 def record(self, status_code, subdir, operation, status='', 679 optional_fields=None): 680 """ 681 Record job-level status 682 683 The intent is to make this file both machine parseable and 684 human readable. That involves a little more complexity, but 685 really isn't all that bad ;-) 686 687 Format is <status code>\t<subdir>\t<operation>\t<status> 688 689 status code: see common_lib.log.is_valid_status() 690 for valid status definition 691 692 subdir: MUST be a relevant subdirectory in the results, 693 or None, which will be represented as '----' 694 695 operation: description of what you ran (e.g. "dbench", or 696 "mkfs -t foobar /dev/sda9") 697 698 status: error message or "completed sucessfully" 699 700 ------------------------------------------------------------ 701 702 Initial tabs indicate indent levels for grouping, and is 703 governed by self.record_prefix 704 705 multiline messages have secondary lines prefaced by a double 706 space (' ') 707 708 Executing this method will trigger the logging of all new 709 warnings to date from the various console loggers. 710 """ 711 # poll all our warning loggers for new warnings 712 warnings = self._read_warnings() 713 old_record_prefix = self.record_prefix 714 try: 715 if status_code.startswith("END "): 716 self.record_prefix += "\t" 717 for timestamp, msg in warnings: 718 self._record("WARN", None, None, msg, timestamp) 719 finally: 720 self.record_prefix = old_record_prefix 721 722 # write out the actual status log line 723 self._record(status_code, subdir, operation, status, 724 optional_fields=optional_fields) 725 726 727 def _read_warnings(self): 728 """Poll all the warning loggers and extract any new warnings that have 729 been logged. If the warnings belong to a category that is currently 730 disabled, this method will discard them and they will no longer be 731 retrievable. 732 733 Returns a list of (timestamp, message) tuples, where timestamp is an 734 integer epoch timestamp.""" 735 warnings = [] 736 while True: 737 # pull in a line of output from every logger that has 738 # output ready to be read 739 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 740 closed_loggers = set() 741 for logger in loggers: 742 line = logger.readline() 743 # record any broken pipes (aka line == empty) 744 if len(line) == 0: 745 closed_loggers.add(logger) 746 continue 747 # parse out the warning 748 timestamp, msgtype, msg = line.split('\t', 2) 749 timestamp = int(timestamp) 750 # if the warning is valid, add it to the results 751 if self.warning_manager.is_valid(timestamp, msgtype): 752 warnings.append((timestamp, msg.strip())) 753 754 # stop listening to loggers that are closed 755 self.warning_loggers -= closed_loggers 756 757 # stop if none of the loggers have any output left 758 if not loggers: 759 break 760 761 # sort into timestamp order 762 warnings.sort() 763 return warnings 764 765 766 def disable_warnings(self, warning_type): 767 self.warning_manager.disable_warnings(warning_type) 768 self.record("INFO", None, None, 769 "disabling %s warnings" % warning_type, 770 {"warnings.disable": warning_type}) 771 772 773 def enable_warnings(self, warning_type): 774 self.warning_manager.enable_warnings(warning_type) 775 self.record("INFO", None, None, 776 "enabling %s warnings" % warning_type, 777 {"warnings.enable": warning_type}) 778 779 780 def get_status_log_path(self, subdir=None): 781 """Return the path to the job status log. 782 783 @param subdir - Optional paramter indicating that you want the path 784 to a subdirectory status log. 785 786 @returns The path where the status log should be. 787 """ 788 if self.resultdir: 789 if subdir: 790 return os.path.join(self.resultdir, subdir, "status.log") 791 else: 792 return os.path.join(self.resultdir, "status.log") 793 else: 794 return None 795 796 797 def _update_uncollected_logs_list(self, update_func): 798 """Updates the uncollected logs list in a multi-process safe manner. 799 800 @param update_func - a function that updates the list of uncollected 801 logs. Should take one parameter, the list to be updated. 802 """ 803 if self.uncollected_log_file: 804 log_file = open(self.uncollected_log_file, "r+") 805 fcntl.flock(log_file, fcntl.LOCK_EX) 806 try: 807 uncollected_logs = pickle.load(log_file) 808 update_func(uncollected_logs) 809 log_file.seek(0) 810 log_file.truncate() 811 pickle.dump(uncollected_logs, log_file) 812 log_file.flush() 813 finally: 814 fcntl.flock(log_file, fcntl.LOCK_UN) 815 log_file.close() 816 817 818 def add_client_log(self, hostname, remote_path, local_path): 819 """Adds a new set of client logs to the list of uncollected logs, 820 to allow for future log recovery. 821 822 @param host - the hostname of the machine holding the logs 823 @param remote_path - the directory on the remote machine holding logs 824 @param local_path - the local directory to copy the logs into 825 """ 826 def update_func(logs_list): 827 logs_list.append((hostname, remote_path, local_path)) 828 self._update_uncollected_logs_list(update_func) 829 830 831 def remove_client_log(self, hostname, remote_path, local_path): 832 """Removes a set of client logs from the list of uncollected logs, 833 to allow for future log recovery. 834 835 @param host - the hostname of the machine holding the logs 836 @param remote_path - the directory on the remote machine holding logs 837 @param local_path - the local directory to copy the logs into 838 """ 839 def update_func(logs_list): 840 logs_list.remove((hostname, remote_path, local_path)) 841 self._update_uncollected_logs_list(update_func) 842 843 844 def _render_record(self, status_code, subdir, operation, status='', 845 epoch_time=None, record_prefix=None, 846 optional_fields=None): 847 """ 848 Internal Function to generate a record to be written into a 849 status log. For use by server_job.* classes only. 850 """ 851 if subdir: 852 if re.match(r'[\n\t]', subdir): 853 raise ValueError('Invalid character in subdir string') 854 substr = subdir 855 else: 856 substr = '----' 857 858 if not log.is_valid_status(status_code): 859 raise ValueError('Invalid status code supplied: %s' % status_code) 860 if not operation: 861 operation = '----' 862 if re.match(r'[\n\t]', operation): 863 raise ValueError('Invalid character in operation string') 864 operation = operation.rstrip() 865 status = status.rstrip() 866 status = re.sub(r"\t", " ", status) 867 # Ensure any continuation lines are marked so we can 868 # detect them in the status file to ensure it is parsable. 869 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 870 871 if not optional_fields: 872 optional_fields = {} 873 874 # Generate timestamps for inclusion in the logs 875 if epoch_time is None: 876 epoch_time = int(time.time()) 877 local_time = time.localtime(epoch_time) 878 optional_fields["timestamp"] = str(epoch_time) 879 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 880 local_time) 881 882 fields = [status_code, substr, operation] 883 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 884 fields.append(status) 885 886 if record_prefix is None: 887 record_prefix = self.record_prefix 888 889 msg = '\t'.join(str(x) for x in fields) 890 return record_prefix + msg + '\n' 891 892 893 def _record_prerendered(self, msg): 894 """ 895 Record a pre-rendered msg into the status logs. The only 896 change this makes to the message is to add on the local 897 indentation. Should not be called outside of server_job.* 898 classes. Unlike _record, this does not write the message 899 to standard output. 900 """ 901 lines = [] 902 status_file = self.get_status_log_path() 903 status_log = open(status_file, 'a') 904 for line in msg.splitlines(): 905 line = self.record_prefix + line + '\n' 906 lines.append(line) 907 status_log.write(line) 908 status_log.close() 909 self.__parse_status(lines) 910 911 912 def _fill_server_control_namespace(self, namespace, protect=True): 913 """ 914 Prepare a namespace to be used when executing server control files. 915 916 This sets up the control file API by importing modules and making them 917 available under the appropriate names within namespace. 918 919 For use by _execute_code(). 920 921 Args: 922 namespace: The namespace dictionary to fill in. 923 protect: Boolean. If True (the default) any operation that would 924 clobber an existing entry in namespace will cause an error. 925 Raises: 926 error.AutoservError: When a name would be clobbered by import. 927 """ 928 def _import_names(module_name, names=()): 929 """ 930 Import a module and assign named attributes into namespace. 931 932 Args: 933 module_name: The string module name. 934 names: A limiting list of names to import from module_name. If 935 empty (the default), all names are imported from the module 936 similar to a "from foo.bar import *" statement. 937 Raises: 938 error.AutoservError: When a name being imported would clobber 939 a name already in namespace. 940 """ 941 module = __import__(module_name, {}, {}, names) 942 943 # No names supplied? Import * from the lowest level module. 944 # (Ugh, why do I have to implement this part myself?) 945 if not names: 946 for submodule_name in module_name.split('.')[1:]: 947 module = getattr(module, submodule_name) 948 if hasattr(module, '__all__'): 949 names = getattr(module, '__all__') 950 else: 951 names = dir(module) 952 953 # Install each name into namespace, checking to make sure it 954 # doesn't override anything that already exists. 955 for name in names: 956 # Check for conflicts to help prevent future problems. 957 if name in namespace and protect: 958 if namespace[name] is not getattr(module, name): 959 raise error.AutoservError('importing name ' 960 '%s from %s %r would override %r' % 961 (name, module_name, getattr(module, name), 962 namespace[name])) 963 else: 964 # Encourage cleanliness and the use of __all__ for a 965 # more concrete API with less surprises on '*' imports. 966 warnings.warn('%s (%r) being imported from %s for use ' 967 'in server control files is not the ' 968 'first occurrance of that import.' % 969 (name, namespace[name], module_name)) 970 971 namespace[name] = getattr(module, name) 972 973 974 # This is the equivalent of prepending a bunch of import statements to 975 # the front of the control script. 976 namespace.update(os=os, sys=sys, logging=logging) 977 _import_names('autotest_lib.server', 978 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler', 979 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel')) 980 _import_names('autotest_lib.server.subcommand', 981 ('parallel', 'parallel_simple', 'subcommand')) 982 _import_names('autotest_lib.server.utils', 983 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 984 _import_names('autotest_lib.client.common_lib.error') 985 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 986 987 # Inject ourself as the job object into other classes within the API. 988 # (Yuck, this injection is a gross thing be part of a public API. -gps) 989 # 990 # XXX Base & SiteAutotest do not appear to use .job. Who does? 991 namespace['autotest'].Autotest.job = self 992 # server.hosts.base_classes.Host uses .job. 993 namespace['hosts'].Host.job = self 994 995 996 def _execute_code(self, code_file, namespace, protect=True): 997 """ 998 Execute code using a copy of namespace as a server control script. 999 1000 Unless protect_namespace is explicitly set to False, the dict will not 1001 be modified. 1002 1003 Args: 1004 code_file: The filename of the control file to execute. 1005 namespace: A dict containing names to make available during execution. 1006 protect: Boolean. If True (the default) a copy of the namespace dict 1007 is used during execution to prevent the code from modifying its 1008 contents outside of this function. If False the raw dict is 1009 passed in and modifications will be allowed. 1010 """ 1011 if protect: 1012 namespace = namespace.copy() 1013 self._fill_server_control_namespace(namespace, protect=protect) 1014 # TODO: Simplify and get rid of the special cases for only 1 machine. 1015 if len(self.machines) > 1: 1016 machines_text = '\n'.join(self.machines) + '\n' 1017 # Only rewrite the file if it does not match our machine list. 1018 try: 1019 machines_f = open(MACHINES_FILENAME, 'r') 1020 existing_machines_text = machines_f.read() 1021 machines_f.close() 1022 except EnvironmentError: 1023 existing_machines_text = None 1024 if machines_text != existing_machines_text: 1025 utils.open_write_close(MACHINES_FILENAME, machines_text) 1026 execfile(code_file, namespace, namespace) 1027 1028 1029 def _record(self, status_code, subdir, operation, status='', 1030 epoch_time=None, optional_fields=None): 1031 """ 1032 Actual function for recording a single line into the status 1033 logs. Should never be called directly, only by job.record as 1034 this would bypass the console monitor logging. 1035 """ 1036 1037 msg = self._render_record(status_code, subdir, operation, status, 1038 epoch_time, optional_fields=optional_fields) 1039 1040 status_file = self.get_status_log_path() 1041 sys.stdout.write(msg) 1042 if status_file: 1043 open(status_file, "a").write(msg) 1044 if subdir: 1045 sub_status_file = self.get_status_log_path(subdir) 1046 open(sub_status_file, "a").write(msg) 1047 self.__parse_status(msg.splitlines()) 1048 1049 1050 def __parse_status(self, new_lines): 1051 if not self.using_parser: 1052 return 1053 new_tests = self.parser.process_lines(new_lines) 1054 for test in new_tests: 1055 self.__insert_test(test) 1056 1057 1058 def __insert_test(self, test): 1059 """ 1060 An internal method to insert a new test result into the 1061 database. This method will not raise an exception, even if an 1062 error occurs during the insert, to avoid failing a test 1063 simply because of unexpected database issues.""" 1064 self.num_tests_run += 1 1065 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1066 self.num_tests_failed += 1 1067 try: 1068 self.results_db.insert_test(self.job_model, test) 1069 except Exception: 1070 msg = ("WARNING: An unexpected error occured while " 1071 "inserting test results into the database. " 1072 "Ignoring error.\n" + traceback.format_exc()) 1073 print >> sys.stderr, msg 1074 1075 1076site_server_job = utils.import_site_class( 1077 __file__, "autotest_lib.server.site_server_job", "site_server_job", 1078 base_server_job) 1079 1080class server_job(site_server_job): 1081 pass 1082 1083 1084class warning_manager(object): 1085 """Class for controlling warning logs. Manages the enabling and disabling 1086 of warnings.""" 1087 def __init__(self): 1088 # a map of warning types to a list of disabled time intervals 1089 self.disabled_warnings = {} 1090 1091 1092 def is_valid(self, timestamp, warning_type): 1093 """Indicates if a warning (based on the time it occured and its type) 1094 is a valid warning. A warning is considered "invalid" if this type of 1095 warning was marked as "disabled" at the time the warning occured.""" 1096 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1097 for start, end in disabled_intervals: 1098 if timestamp >= start and (end is None or timestamp < end): 1099 return False 1100 return True 1101 1102 1103 def disable_warnings(self, warning_type, current_time_func=time.time): 1104 """As of now, disables all further warnings of this type.""" 1105 intervals = self.disabled_warnings.setdefault(warning_type, []) 1106 if not intervals or intervals[-1][1] is not None: 1107 intervals.append((int(current_time_func()), None)) 1108 1109 1110 def enable_warnings(self, warning_type, current_time_func=time.time): 1111 """As of now, enables all further warnings of this type.""" 1112 intervals = self.disabled_warnings.get(warning_type, []) 1113 if intervals and intervals[-1][1] is None: 1114 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1115