server_job.py revision e29d0e442c8eefe07dfb2533e1a1dafc14b69e0b
1""" 2The main job wrapper for the server side. 3 4This is the core infrastructure. Derived from the client side job.py 5 6Copyright Martin J. Bligh, Andy Whitcroft 2007 7""" 8 9import getpass, os, sys, re, stat, tempfile, time, select, subprocess 10import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno 11from autotest_lib.client.bin import sysinfo 12from autotest_lib.client.common_lib import base_job 13from autotest_lib.client.common_lib import error, log, utils, packages 14from autotest_lib.client.common_lib import logging_manager 15from autotest_lib.server import test, subcommand, profilers 16from autotest_lib.server.hosts import abstract_ssh 17from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 18 19 20def _control_segment_path(name): 21 """Get the pathname of the named control segment file.""" 22 server_dir = os.path.dirname(os.path.abspath(__file__)) 23 return os.path.join(server_dir, "control_segments", name) 24 25 26CLIENT_CONTROL_FILENAME = 'control' 27SERVER_CONTROL_FILENAME = 'control.srv' 28MACHINES_FILENAME = '.machines' 29 30CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 31CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 32CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 33INSTALL_CONTROL_FILE = _control_segment_path('install') 34CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 35 36VERIFY_CONTROL_FILE = _control_segment_path('verify') 37REPAIR_CONTROL_FILE = _control_segment_path('repair') 38 39 40# by default provide a stub that generates no site data 41def _get_site_job_data_dummy(job): 42 return {} 43 44 45# load up site-specific code for generating site-specific job data 46get_site_job_data = utils.import_site_function(__file__, 47 "autotest_lib.server.site_server_job", "get_site_job_data", 48 _get_site_job_data_dummy) 49 50 51class status_indenter(base_job.status_indenter): 52 """Provide a simple integer-backed status indenter.""" 53 def __init__(self): 54 self._indent = 0 55 56 57 @property 58 def indent(self): 59 return self._indent 60 61 62 def increment(self): 63 self._indent += 1 64 65 66 def decrement(self): 67 self._indent -= 1 68 69 70 def get_context(self): 71 """Returns a context object for use by job.get_record_context.""" 72 class context(object): 73 def __init__(self, indenter, indent): 74 self._indenter = indenter 75 self._indent = indent 76 def restore(self): 77 self._indenter._indent = self._indent 78 return context(self, self._indent) 79 80 81class server_job_record_hook(object): 82 """The job.record hook for server job. Used to inject WARN messages from 83 the console or vlm whenever new logs are written, and to echo any logs 84 to INFO level logging. Implemented as a class so that it can use state to 85 block recursive calls, so that the hook can call job.record itself to 86 log WARN messages. 87 88 Depends on job._read_warnings and job._logger. 89 """ 90 def __init__(self, job): 91 self._job = job 92 self._being_called = False 93 94 95 def __call__(self, entry): 96 """A wrapper around the 'real' record hook, the _hook method, which 97 prevents recursion. This isn't making any effort to be threadsafe, 98 the intent is to outright block infinite recursion via a 99 job.record->_hook->job.record->_hook->job.record... chain.""" 100 if self._being_called: 101 return 102 self._being_called = True 103 try: 104 self._hook(self._job, entry) 105 finally: 106 self._being_called = False 107 108 109 @staticmethod 110 def _hook(job, entry): 111 """The core hook, which can safely call job.record.""" 112 entries = [] 113 # poll all our warning loggers for new warnings 114 for timestamp, msg in job._read_warnings(): 115 warning_entry = base_job.status_log_entry( 116 'WARN', None, None, msg, {}, timestamp=timestamp) 117 entries.append(warning_entry) 118 job.record_entry(warning_entry) 119 # echo rendered versions of all the status logs to info 120 entries.append(entry) 121 for entry in entries: 122 rendered_entry = job._logger.render_entry(entry) 123 logging.info(rendered_entry) 124 job._parse_status(rendered_entry) 125 126 127class base_server_job(base_job.base_job): 128 """The server-side concrete implementation of base_job. 129 130 Optional properties provided by this implementation: 131 serverdir 132 conmuxdir 133 134 num_tests_run 135 num_tests_failed 136 137 warning_manager 138 warning_loggers 139 """ 140 141 _STATUS_VERSION = 1 142 143 def __init__(self, control, args, resultdir, label, user, machines, 144 client=False, parse_job='', 145 ssh_user='root', ssh_port=22, ssh_pass='', 146 group_name='', tag='', 147 control_filename=SERVER_CONTROL_FILENAME): 148 """ 149 Create a server side job object. 150 151 @param control: The pathname of the control file. 152 @param args: Passed to the control file. 153 @param resultdir: Where to throw the results. 154 @param label: Description of the job. 155 @param user: Username for the job (email address). 156 @param client: True if this is a client-side control file. 157 @param parse_job: string, if supplied it is the job execution tag that 158 the results will be passed through to the TKO parser with. 159 @param ssh_user: The SSH username. [root] 160 @param ssh_port: The SSH port number. [22] 161 @param ssh_pass: The SSH passphrase, if needed. 162 @param group_name: If supplied, this will be written out as 163 host_group_name in the keyvals file for the parser. 164 @param tag: The job execution tag from the scheduler. [optional] 165 @param control_filename: The filename where the server control file 166 should be written in the results directory. 167 """ 168 super(base_server_job, self).__init__(resultdir=resultdir) 169 170 path = os.path.dirname(__file__) 171 self.control = control 172 self._uncollected_log_file = os.path.join(self.resultdir, 173 'uncollected_logs') 174 debugdir = os.path.join(self.resultdir, 'debug') 175 if not os.path.exists(debugdir): 176 os.mkdir(debugdir) 177 178 if user: 179 self.user = user 180 else: 181 self.user = getpass.getuser() 182 183 self.args = args 184 self.machines = machines 185 self._client = client 186 self.warning_loggers = set() 187 self.warning_manager = warning_manager() 188 self._ssh_user = ssh_user 189 self._ssh_port = ssh_port 190 self._ssh_pass = ssh_pass 191 self.tag = tag 192 self.last_boot_tag = None 193 self.hosts = set() 194 self.drop_caches = False 195 self.drop_caches_between_iterations = False 196 self._control_filename = control_filename 197 198 self.logging = logging_manager.get_logging_manager( 199 manage_stdout_and_stderr=True, redirect_fds=True) 200 subcommand.logging_manager_object = self.logging 201 202 self.sysinfo = sysinfo.sysinfo(self.resultdir) 203 self.profilers = profilers.profilers(self) 204 205 job_data = {'label' : label, 'user' : user, 206 'hostname' : ','.join(machines), 207 'status_version' : str(self._STATUS_VERSION), 208 'job_started' : str(int(time.time()))} 209 if group_name: 210 job_data['host_group_name'] = group_name 211 212 # only write these keyvals out on the first job in a resultdir 213 if 'job_started' not in utils.read_keyval(self.resultdir): 214 job_data.update(get_site_job_data(self)) 215 utils.write_keyval(self.resultdir, job_data) 216 217 self._parse_job = parse_job 218 self._using_parser = (self._parse_job and len(machines) <= 1) 219 self.pkgmgr = packages.PackageManager( 220 self.autodir, run_function_dargs={'timeout':600}) 221 self.num_tests_run = 0 222 self.num_tests_failed = 0 223 224 self._register_subcommand_hooks() 225 226 # these components aren't usable on the server 227 self.bootloader = None 228 self.harness = None 229 230 # set up the status logger 231 self._indenter = status_indenter() 232 self._logger = base_job.status_logger( 233 self, self._indenter, 'status.log', 'status.log', 234 record_hook=server_job_record_hook(self)) 235 236 237 @classmethod 238 def _find_base_directories(cls): 239 """ 240 Determine locations of autodir, clientdir and serverdir. Assumes 241 that this file is located within serverdir and uses __file__ along 242 with relative paths to resolve the location. 243 """ 244 serverdir = os.path.abspath(os.path.dirname(__file__)) 245 autodir = os.path.normpath(os.path.join(serverdir, '..')) 246 clientdir = os.path.join(autodir, 'client') 247 return autodir, clientdir, serverdir 248 249 250 def _find_resultdir(self, resultdir): 251 """ 252 Determine the location of resultdir. For server jobs we expect one to 253 always be explicitly passed in to __init__, so just return that. 254 """ 255 if resultdir: 256 return os.path.normpath(resultdir) 257 else: 258 return None 259 260 261 def _get_status_logger(self): 262 """Return a reference to the status logger.""" 263 return self._logger 264 265 266 @staticmethod 267 def _load_control_file(path): 268 f = open(path) 269 try: 270 control_file = f.read() 271 finally: 272 f.close() 273 return re.sub('\r', '', control_file) 274 275 276 def _register_subcommand_hooks(self): 277 """ 278 Register some hooks into the subcommand modules that allow us 279 to properly clean up self.hosts created in forked subprocesses. 280 """ 281 def on_fork(cmd): 282 self._existing_hosts_on_fork = set(self.hosts) 283 def on_join(cmd): 284 new_hosts = self.hosts - self._existing_hosts_on_fork 285 for host in new_hosts: 286 host.close() 287 subcommand.subcommand.register_fork_hook(on_fork) 288 subcommand.subcommand.register_join_hook(on_join) 289 290 291 def init_parser(self): 292 """ 293 Start the continuous parsing of self.resultdir. This sets up 294 the database connection and inserts the basic job object into 295 the database if necessary. 296 """ 297 if not self._using_parser: 298 return 299 # redirect parser debugging to .parse.log 300 parse_log = os.path.join(self.resultdir, '.parse.log') 301 parse_log = open(parse_log, 'w', 0) 302 tko_utils.redirect_parser_debugging(parse_log) 303 # create a job model object and set up the db 304 self.results_db = tko_db.db(autocommit=True) 305 self.parser = status_lib.parser(self._STATUS_VERSION) 306 self.job_model = self.parser.make_job(self.resultdir) 307 self.parser.start(self.job_model) 308 # check if a job already exists in the db and insert it if 309 # it does not 310 job_idx = self.results_db.find_job(self._parse_job) 311 if job_idx is None: 312 self.results_db.insert_job(self._parse_job, self.job_model) 313 else: 314 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 315 self.job_model.index = job_idx 316 self.job_model.machine_idx = machine_idx 317 318 319 def cleanup_parser(self): 320 """ 321 This should be called after the server job is finished 322 to carry out any remaining cleanup (e.g. flushing any 323 remaining test results to the results db) 324 """ 325 if not self._using_parser: 326 return 327 final_tests = self.parser.end() 328 for test in final_tests: 329 self.__insert_test(test) 330 self._using_parser = False 331 332 333 def verify(self): 334 if not self.machines: 335 raise error.AutoservError('No machines specified to verify') 336 if self.resultdir: 337 os.chdir(self.resultdir) 338 try: 339 namespace = {'machines' : self.machines, 'job' : self, 340 'ssh_user' : self._ssh_user, 341 'ssh_port' : self._ssh_port, 342 'ssh_pass' : self._ssh_pass} 343 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 344 except Exception, e: 345 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc()) 346 self.record('ABORT', None, None, msg) 347 raise 348 349 350 def repair(self, host_protection): 351 if not self.machines: 352 raise error.AutoservError('No machines specified to repair') 353 if self.resultdir: 354 os.chdir(self.resultdir) 355 namespace = {'machines': self.machines, 'job': self, 356 'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port, 357 'ssh_pass': self._ssh_pass, 358 'protection_level': host_protection} 359 360 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 361 362 363 def precheck(self): 364 """ 365 perform any additional checks in derived classes. 366 """ 367 pass 368 369 370 def enable_external_logging(self): 371 """ 372 Start or restart external logging mechanism. 373 """ 374 pass 375 376 377 def disable_external_logging(self): 378 """ 379 Pause or stop external logging mechanism. 380 """ 381 pass 382 383 384 def use_external_logging(self): 385 """ 386 Return True if external logging should be used. 387 """ 388 return False 389 390 391 def _make_parallel_wrapper(self, function, machines, log): 392 """Wrap function as appropriate for calling by parallel_simple.""" 393 is_forking = not (len(machines) == 1 and self.machines == machines) 394 if self._parse_job and is_forking and log: 395 def wrapper(machine): 396 self._parse_job += "/" + machine 397 self._using_parser = True 398 self.machines = [machine] 399 self.push_execution_context(machine) 400 os.chdir(self.resultdir) 401 utils.write_keyval(self.resultdir, {"hostname": machine}) 402 self.init_parser() 403 result = function(machine) 404 self.cleanup_parser() 405 return result 406 elif len(machines) > 1 and log: 407 def wrapper(machine): 408 self.push_execution_context(machine) 409 os.chdir(self.resultdir) 410 machine_data = {'hostname' : machine, 411 'status_version' : str(self._STATUS_VERSION)} 412 utils.write_keyval(self.resultdir, machine_data) 413 result = function(machine) 414 return result 415 else: 416 wrapper = function 417 return wrapper 418 419 420 def parallel_simple(self, function, machines, log=True, timeout=None, 421 return_results=False): 422 """ 423 Run 'function' using parallel_simple, with an extra wrapper to handle 424 the necessary setup for continuous parsing, if possible. If continuous 425 parsing is already properly initialized then this should just work. 426 427 @param function: A callable to run in parallel given each machine. 428 @param machines: A list of machine names to be passed one per subcommand 429 invocation of function. 430 @param log: If True, output will be written to output in a subdirectory 431 named after each machine. 432 @param timeout: Seconds after which the function call should timeout. 433 @param return_results: If True instead of an AutoServError being raised 434 on any error a list of the results|exceptions from the function 435 called on each arg is returned. [default: False] 436 437 @raises error.AutotestError: If any of the functions failed. 438 """ 439 wrapper = self._make_parallel_wrapper(function, machines, log) 440 return subcommand.parallel_simple(wrapper, machines, 441 log=log, timeout=timeout, 442 return_results=return_results) 443 444 445 def parallel_on_machines(self, function, machines, timeout=None): 446 """ 447 @param function: Called in parallel with one machine as its argument. 448 @param machines: A list of machines to call function(machine) on. 449 @param timeout: Seconds after which the function call should timeout. 450 451 @returns A list of machines on which function(machine) returned 452 without raising an exception. 453 """ 454 results = self.parallel_simple(function, machines, timeout=timeout, 455 return_results=True) 456 success_machines = [] 457 for result, machine in itertools.izip(results, machines): 458 if not isinstance(result, Exception): 459 success_machines.append(machine) 460 return success_machines 461 462 463 _USE_TEMP_DIR = object() 464 def run(self, cleanup=False, install_before=False, install_after=False, 465 collect_crashdumps=True, namespace={}, control=None, 466 control_file_dir=None, only_collect_crashinfo=False): 467 # for a normal job, make sure the uncollected logs file exists 468 # for a crashinfo-only run it should already exist, bail out otherwise 469 created_uncollected_logs = False 470 if self.resultdir and not os.path.exists(self._uncollected_log_file): 471 if only_collect_crashinfo: 472 # if this is a crashinfo-only run, and there were no existing 473 # uncollected logs, just bail out early 474 logging.info("No existing uncollected logs, " 475 "skipping crashinfo collection") 476 return 477 else: 478 log_file = open(self._uncollected_log_file, "w") 479 pickle.dump([], log_file) 480 log_file.close() 481 created_uncollected_logs = True 482 483 # use a copy so changes don't affect the original dictionary 484 namespace = namespace.copy() 485 machines = self.machines 486 if control is None: 487 if self.control is None: 488 control = '' 489 else: 490 control = self._load_control_file(self.control) 491 if control_file_dir is None: 492 control_file_dir = self.resultdir 493 494 self.aborted = False 495 namespace['machines'] = machines 496 namespace['args'] = self.args 497 namespace['job'] = self 498 namespace['ssh_user'] = self._ssh_user 499 namespace['ssh_port'] = self._ssh_port 500 namespace['ssh_pass'] = self._ssh_pass 501 test_start_time = int(time.time()) 502 503 if self.resultdir: 504 os.chdir(self.resultdir) 505 # touch status.log so that the parser knows a job is running here 506 open(self.get_status_log_path(), 'a').close() 507 self.enable_external_logging() 508 509 collect_crashinfo = True 510 temp_control_file_dir = None 511 try: 512 try: 513 if install_before and machines: 514 self._execute_code(INSTALL_CONTROL_FILE, namespace) 515 516 if only_collect_crashinfo: 517 return 518 519 # determine the dir to write the control files to 520 cfd_specified = (control_file_dir 521 and control_file_dir is not self._USE_TEMP_DIR) 522 if cfd_specified: 523 temp_control_file_dir = None 524 else: 525 temp_control_file_dir = tempfile.mkdtemp( 526 suffix='temp_control_file_dir') 527 control_file_dir = temp_control_file_dir 528 server_control_file = os.path.join(control_file_dir, 529 self._control_filename) 530 client_control_file = os.path.join(control_file_dir, 531 CLIENT_CONTROL_FILENAME) 532 if self._client: 533 namespace['control'] = control 534 utils.open_write_close(client_control_file, control) 535 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 536 server_control_file) 537 else: 538 utils.open_write_close(server_control_file, control) 539 logging.info("Processing control file") 540 self._execute_code(server_control_file, namespace) 541 logging.info("Finished processing control file") 542 543 # no error occured, so we don't need to collect crashinfo 544 collect_crashinfo = False 545 except: 546 try: 547 logging.exception( 548 'Exception escaped control file, job aborting:') 549 except: 550 pass # don't let logging exceptions here interfere 551 raise 552 finally: 553 if temp_control_file_dir: 554 # Clean up temp directory used for copies of the control files 555 try: 556 shutil.rmtree(temp_control_file_dir) 557 except Exception, e: 558 logging.warn('Could not remove temp directory %s: %s', 559 temp_control_file_dir, e) 560 561 if machines and (collect_crashdumps or collect_crashinfo): 562 namespace['test_start_time'] = test_start_time 563 if collect_crashinfo: 564 # includes crashdumps 565 self._execute_code(CRASHINFO_CONTROL_FILE, namespace) 566 else: 567 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace) 568 if self._uncollected_log_file and created_uncollected_logs: 569 os.remove(self._uncollected_log_file) 570 self.disable_external_logging() 571 if cleanup and machines: 572 self._execute_code(CLEANUP_CONTROL_FILE, namespace) 573 if install_after and machines: 574 self._execute_code(INSTALL_CONTROL_FILE, namespace) 575 576 577 def run_test(self, url, *args, **dargs): 578 """ 579 Summon a test object and run it. 580 581 tag 582 tag to add to testname 583 url 584 url of the test to run 585 """ 586 group, testname = self.pkgmgr.get_package_name(url, 'test') 587 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 588 outputdir = self._make_test_outputdir(subdir) 589 590 def group_func(): 591 try: 592 test.runtest(self, url, tag, args, dargs) 593 except error.TestBaseException, e: 594 self.record(e.exit_status, subdir, testname, str(e)) 595 raise 596 except Exception, e: 597 info = str(e) + "\n" + traceback.format_exc() 598 self.record('FAIL', subdir, testname, info) 599 raise 600 else: 601 self.record('GOOD', subdir, testname, 'completed successfully') 602 603 result, exc_info = self._run_group(testname, subdir, group_func) 604 if exc_info and isinstance(exc_info[1], error.TestBaseException): 605 return False 606 elif exc_info: 607 raise exc_info[0], exc_info[1], exc_info[2] 608 else: 609 return True 610 611 612 def _run_group(self, name, subdir, function, *args, **dargs): 613 """\ 614 Underlying method for running something inside of a group. 615 """ 616 result, exc_info = None, None 617 try: 618 self.record('START', subdir, name) 619 result = function(*args, **dargs) 620 except error.TestBaseException, e: 621 self.record("END %s" % e.exit_status, subdir, name) 622 exc_info = sys.exc_info() 623 except Exception, e: 624 err_msg = str(e) + '\n' 625 err_msg += traceback.format_exc() 626 self.record('END ABORT', subdir, name, err_msg) 627 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 628 else: 629 self.record('END GOOD', subdir, name) 630 631 return result, exc_info 632 633 634 def run_group(self, function, *args, **dargs): 635 """\ 636 function: 637 subroutine to run 638 *args: 639 arguments for the function 640 """ 641 642 name = function.__name__ 643 644 # Allow the tag for the group to be specified. 645 tag = dargs.pop('tag', None) 646 if tag: 647 name = tag 648 649 return self._run_group(name, None, function, *args, **dargs)[0] 650 651 652 def run_reboot(self, reboot_func, get_kernel_func): 653 """\ 654 A specialization of run_group meant specifically for handling 655 a reboot. Includes support for capturing the kernel version 656 after the reboot. 657 658 reboot_func: a function that carries out the reboot 659 660 get_kernel_func: a function that returns a string 661 representing the kernel version. 662 """ 663 try: 664 self.record('START', None, 'reboot') 665 reboot_func() 666 except Exception, e: 667 err_msg = str(e) + '\n' + traceback.format_exc() 668 self.record('END FAIL', None, 'reboot', err_msg) 669 raise 670 else: 671 kernel = get_kernel_func() 672 self.record('END GOOD', None, 'reboot', 673 optional_fields={"kernel": kernel}) 674 675 676 def run_control(self, path): 677 """Execute a control file found at path (relative to the autotest 678 path). Intended for executing a control file within a control file, 679 not for running the top-level job control file.""" 680 path = os.path.join(self.autodir, path) 681 control_file = self._load_control_file(path) 682 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 683 684 685 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 686 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 687 on_every_test) 688 689 690 def add_sysinfo_logfile(self, file, on_every_test=False): 691 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 692 693 694 def _add_sysinfo_loggable(self, loggable, on_every_test): 695 if on_every_test: 696 self.sysinfo.test_loggables.add(loggable) 697 else: 698 self.sysinfo.boot_loggables.add(loggable) 699 700 701 def _read_warnings(self): 702 """Poll all the warning loggers and extract any new warnings that have 703 been logged. If the warnings belong to a category that is currently 704 disabled, this method will discard them and they will no longer be 705 retrievable. 706 707 Returns a list of (timestamp, message) tuples, where timestamp is an 708 integer epoch timestamp.""" 709 warnings = [] 710 while True: 711 # pull in a line of output from every logger that has 712 # output ready to be read 713 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 714 closed_loggers = set() 715 for logger in loggers: 716 line = logger.readline() 717 # record any broken pipes (aka line == empty) 718 if len(line) == 0: 719 closed_loggers.add(logger) 720 continue 721 # parse out the warning 722 timestamp, msgtype, msg = line.split('\t', 2) 723 timestamp = int(timestamp) 724 # if the warning is valid, add it to the results 725 if self.warning_manager.is_valid(timestamp, msgtype): 726 warnings.append((timestamp, msg.strip())) 727 728 # stop listening to loggers that are closed 729 self.warning_loggers -= closed_loggers 730 731 # stop if none of the loggers have any output left 732 if not loggers: 733 break 734 735 # sort into timestamp order 736 warnings.sort() 737 return warnings 738 739 740 def _unique_subdirectory(self, base_subdirectory_name): 741 """Compute a unique results subdirectory based on the given name. 742 743 Appends base_subdirectory_name with a number as necessary to find a 744 directory name that doesn't already exist. 745 """ 746 subdirectory = base_subdirectory_name 747 counter = 1 748 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 749 subdirectory = base_subdirectory_name + '.' + str(counter) 750 counter += 1 751 return subdirectory 752 753 754 def get_record_context(self): 755 """Returns an object representing the current job.record context. 756 757 The object returned is an opaque object with a 0-arg restore method 758 which can be called to restore the job.record context (i.e. indentation) 759 to the current level. The intention is that it should be used when 760 something external which generate job.record calls (e.g. an autotest 761 client) can fail catastrophically and the server job record state 762 needs to be reset to its original "known good" state. 763 764 @return: A context object with a 0-arg restore() method.""" 765 return self._indenter.get_context() 766 767 768 def record_summary(self, status_code, test_name, reason='', attributes=None, 769 distinguishing_attributes=(), child_test_ids=None): 770 """Record a summary test result. 771 772 @param status_code: status code string, see 773 common_lib.log.is_valid_status() 774 @param test_name: name of the test 775 @param reason: (optional) string providing detailed reason for test 776 outcome 777 @param attributes: (optional) dict of string keyvals to associate with 778 this result 779 @param distinguishing_attributes: (optional) list of attribute names 780 that should be used to distinguish identically-named test 781 results. These attributes should be present in the attributes 782 parameter. This is used to generate user-friendly subdirectory 783 names. 784 @param child_test_ids: (optional) list of test indices for test results 785 used in generating this result. 786 """ 787 subdirectory_name_parts = [test_name] 788 for attribute in distinguishing_attributes: 789 assert attributes 790 assert attribute in attributes, '%s not in %s' % (attribute, 791 attributes) 792 subdirectory_name_parts.append(attributes[attribute]) 793 base_subdirectory_name = '.'.join(subdirectory_name_parts) 794 795 subdirectory = self._unique_subdirectory(base_subdirectory_name) 796 subdirectory_path = os.path.join(self.resultdir, subdirectory) 797 os.mkdir(subdirectory_path) 798 799 self.record(status_code, subdirectory, test_name, 800 status=reason, optional_fields={'is_summary': True}) 801 802 if attributes: 803 utils.write_keyval(subdirectory_path, attributes) 804 805 if child_test_ids: 806 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 807 summary_data = {'child_test_ids': ids_string} 808 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 809 summary_data) 810 811 812 def disable_warnings(self, warning_type): 813 self.warning_manager.disable_warnings(warning_type) 814 self.record("INFO", None, None, 815 "disabling %s warnings" % warning_type, 816 {"warnings.disable": warning_type}) 817 818 819 def enable_warnings(self, warning_type): 820 self.warning_manager.enable_warnings(warning_type) 821 self.record("INFO", None, None, 822 "enabling %s warnings" % warning_type, 823 {"warnings.enable": warning_type}) 824 825 826 def get_status_log_path(self, subdir=None): 827 """Return the path to the job status log. 828 829 @param subdir - Optional paramter indicating that you want the path 830 to a subdirectory status log. 831 832 @returns The path where the status log should be. 833 """ 834 if self.resultdir: 835 if subdir: 836 return os.path.join(self.resultdir, subdir, "status.log") 837 else: 838 return os.path.join(self.resultdir, "status.log") 839 else: 840 return None 841 842 843 def _update_uncollected_logs_list(self, update_func): 844 """Updates the uncollected logs list in a multi-process safe manner. 845 846 @param update_func - a function that updates the list of uncollected 847 logs. Should take one parameter, the list to be updated. 848 """ 849 if self._uncollected_log_file: 850 log_file = open(self._uncollected_log_file, "r+") 851 fcntl.flock(log_file, fcntl.LOCK_EX) 852 try: 853 uncollected_logs = pickle.load(log_file) 854 update_func(uncollected_logs) 855 log_file.seek(0) 856 log_file.truncate() 857 pickle.dump(uncollected_logs, log_file) 858 log_file.flush() 859 finally: 860 fcntl.flock(log_file, fcntl.LOCK_UN) 861 log_file.close() 862 863 864 def add_client_log(self, hostname, remote_path, local_path): 865 """Adds a new set of client logs to the list of uncollected logs, 866 to allow for future log recovery. 867 868 @param host - the hostname of the machine holding the logs 869 @param remote_path - the directory on the remote machine holding logs 870 @param local_path - the local directory to copy the logs into 871 """ 872 def update_func(logs_list): 873 logs_list.append((hostname, remote_path, local_path)) 874 self._update_uncollected_logs_list(update_func) 875 876 877 def remove_client_log(self, hostname, remote_path, local_path): 878 """Removes a set of client logs from the list of uncollected logs, 879 to allow for future log recovery. 880 881 @param host - the hostname of the machine holding the logs 882 @param remote_path - the directory on the remote machine holding logs 883 @param local_path - the local directory to copy the logs into 884 """ 885 def update_func(logs_list): 886 logs_list.remove((hostname, remote_path, local_path)) 887 self._update_uncollected_logs_list(update_func) 888 889 890 def get_client_logs(self): 891 """Retrieves the list of uncollected logs, if it exists. 892 893 @returns A list of (host, remote_path, local_path) tuples. Returns 894 an empty list if no uncollected logs file exists. 895 """ 896 log_exists = (self._uncollected_log_file and 897 os.path.exists(self._uncollected_log_file)) 898 if log_exists: 899 return pickle.load(open(self._uncollected_log_file)) 900 else: 901 return [] 902 903 904 def _fill_server_control_namespace(self, namespace, protect=True): 905 """ 906 Prepare a namespace to be used when executing server control files. 907 908 This sets up the control file API by importing modules and making them 909 available under the appropriate names within namespace. 910 911 For use by _execute_code(). 912 913 Args: 914 namespace: The namespace dictionary to fill in. 915 protect: Boolean. If True (the default) any operation that would 916 clobber an existing entry in namespace will cause an error. 917 Raises: 918 error.AutoservError: When a name would be clobbered by import. 919 """ 920 def _import_names(module_name, names=()): 921 """ 922 Import a module and assign named attributes into namespace. 923 924 Args: 925 module_name: The string module name. 926 names: A limiting list of names to import from module_name. If 927 empty (the default), all names are imported from the module 928 similar to a "from foo.bar import *" statement. 929 Raises: 930 error.AutoservError: When a name being imported would clobber 931 a name already in namespace. 932 """ 933 module = __import__(module_name, {}, {}, names) 934 935 # No names supplied? Import * from the lowest level module. 936 # (Ugh, why do I have to implement this part myself?) 937 if not names: 938 for submodule_name in module_name.split('.')[1:]: 939 module = getattr(module, submodule_name) 940 if hasattr(module, '__all__'): 941 names = getattr(module, '__all__') 942 else: 943 names = dir(module) 944 945 # Install each name into namespace, checking to make sure it 946 # doesn't override anything that already exists. 947 for name in names: 948 # Check for conflicts to help prevent future problems. 949 if name in namespace and protect: 950 if namespace[name] is not getattr(module, name): 951 raise error.AutoservError('importing name ' 952 '%s from %s %r would override %r' % 953 (name, module_name, getattr(module, name), 954 namespace[name])) 955 else: 956 # Encourage cleanliness and the use of __all__ for a 957 # more concrete API with less surprises on '*' imports. 958 warnings.warn('%s (%r) being imported from %s for use ' 959 'in server control files is not the ' 960 'first occurrance of that import.' % 961 (name, namespace[name], module_name)) 962 963 namespace[name] = getattr(module, name) 964 965 966 # This is the equivalent of prepending a bunch of import statements to 967 # the front of the control script. 968 namespace.update(os=os, sys=sys, logging=logging) 969 _import_names('autotest_lib.server', 970 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler', 971 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel')) 972 _import_names('autotest_lib.server.subcommand', 973 ('parallel', 'parallel_simple', 'subcommand')) 974 _import_names('autotest_lib.server.utils', 975 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 976 _import_names('autotest_lib.client.common_lib.error') 977 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 978 979 # Inject ourself as the job object into other classes within the API. 980 # (Yuck, this injection is a gross thing be part of a public API. -gps) 981 # 982 # XXX Base & SiteAutotest do not appear to use .job. Who does? 983 namespace['autotest'].Autotest.job = self 984 # server.hosts.base_classes.Host uses .job. 985 namespace['hosts'].Host.job = self 986 987 988 def _execute_code(self, code_file, namespace, protect=True): 989 """ 990 Execute code using a copy of namespace as a server control script. 991 992 Unless protect_namespace is explicitly set to False, the dict will not 993 be modified. 994 995 Args: 996 code_file: The filename of the control file to execute. 997 namespace: A dict containing names to make available during execution. 998 protect: Boolean. If True (the default) a copy of the namespace dict 999 is used during execution to prevent the code from modifying its 1000 contents outside of this function. If False the raw dict is 1001 passed in and modifications will be allowed. 1002 """ 1003 if protect: 1004 namespace = namespace.copy() 1005 self._fill_server_control_namespace(namespace, protect=protect) 1006 # TODO: Simplify and get rid of the special cases for only 1 machine. 1007 if len(self.machines) > 1: 1008 machines_text = '\n'.join(self.machines) + '\n' 1009 # Only rewrite the file if it does not match our machine list. 1010 try: 1011 machines_f = open(MACHINES_FILENAME, 'r') 1012 existing_machines_text = machines_f.read() 1013 machines_f.close() 1014 except EnvironmentError: 1015 existing_machines_text = None 1016 if machines_text != existing_machines_text: 1017 utils.open_write_close(MACHINES_FILENAME, machines_text) 1018 execfile(code_file, namespace, namespace) 1019 1020 1021 def _parse_status(self, new_line): 1022 if not self._using_parser: 1023 return 1024 new_tests = self.parser.process_lines([new_line]) 1025 for test in new_tests: 1026 self.__insert_test(test) 1027 1028 1029 def __insert_test(self, test): 1030 """ 1031 An internal method to insert a new test result into the 1032 database. This method will not raise an exception, even if an 1033 error occurs during the insert, to avoid failing a test 1034 simply because of unexpected database issues.""" 1035 self.num_tests_run += 1 1036 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 1037 self.num_tests_failed += 1 1038 try: 1039 self.results_db.insert_test(self.job_model, test) 1040 except Exception: 1041 msg = ("WARNING: An unexpected error occured while " 1042 "inserting test results into the database. " 1043 "Ignoring error.\n" + traceback.format_exc()) 1044 print >> sys.stderr, msg 1045 1046 1047 def preprocess_client_state(self): 1048 """ 1049 Produce a state file for initializing the state of a client job. 1050 1051 Creates a new client state file with all the current server state, as 1052 well as some pre-set client state. 1053 1054 @returns The path of the file the state was written into. 1055 """ 1056 # initialize the sysinfo state 1057 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1058 1059 # dump the state out to a tempfile 1060 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1061 os.close(fd) 1062 1063 # write_to_file doesn't need locking, we exclusively own file_path 1064 self._state.write_to_file(file_path) 1065 return file_path 1066 1067 1068 def postprocess_client_state(self, state_path): 1069 """ 1070 Update the state of this job with the state from a client job. 1071 1072 Updates the state of the server side of a job with the final state 1073 of a client job that was run. Updates the non-client-specific state, 1074 pulls in some specific bits from the client-specific state, and then 1075 discards the rest. Removes the state file afterwards 1076 1077 @param state_file A path to the state file from the client. 1078 """ 1079 # update the on-disk state 1080 try: 1081 self._state.read_from_file(state_path) 1082 os.remove(state_path) 1083 except OSError, e: 1084 # ignore file-not-found errors 1085 if e.errno != errno.ENOENT: 1086 raise 1087 else: 1088 logging.debug('Client state file %s not found', state_path) 1089 1090 # update the sysinfo state 1091 if self._state.has('client', 'sysinfo'): 1092 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1093 1094 # drop all the client-specific state 1095 self._state.discard_namespace('client') 1096 1097 1098 def clear_all_known_hosts(self): 1099 """Clears known hosts files for all AbstractSSHHosts.""" 1100 for host in self.hosts: 1101 if isinstance(host, abstract_ssh.AbstractSSHHost): 1102 host.clear_known_hosts() 1103 1104 1105site_server_job = utils.import_site_class( 1106 __file__, "autotest_lib.server.site_server_job", "site_server_job", 1107 base_server_job) 1108 1109class server_job(site_server_job): 1110 pass 1111 1112 1113class warning_manager(object): 1114 """Class for controlling warning logs. Manages the enabling and disabling 1115 of warnings.""" 1116 def __init__(self): 1117 # a map of warning types to a list of disabled time intervals 1118 self.disabled_warnings = {} 1119 1120 1121 def is_valid(self, timestamp, warning_type): 1122 """Indicates if a warning (based on the time it occured and its type) 1123 is a valid warning. A warning is considered "invalid" if this type of 1124 warning was marked as "disabled" at the time the warning occured.""" 1125 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1126 for start, end in disabled_intervals: 1127 if timestamp >= start and (end is None or timestamp < end): 1128 return False 1129 return True 1130 1131 1132 def disable_warnings(self, warning_type, current_time_func=time.time): 1133 """As of now, disables all further warnings of this type.""" 1134 intervals = self.disabled_warnings.setdefault(warning_type, []) 1135 if not intervals or intervals[-1][1] is not None: 1136 intervals.append((int(current_time_func()), None)) 1137 1138 1139 def enable_warnings(self, warning_type, current_time_func=time.time): 1140 """As of now, enables all further warnings of this type.""" 1141 intervals = self.disabled_warnings.get(warning_type, []) 1142 if intervals and intervals[-1][1] is None: 1143 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1144