server_job.py revision 25c0b8cb56358f22dccf7fdc32dd1662787dc9ca
1""" 2The main job wrapper for the server side. 3 4This is the core infrastructure. Derived from the client side job.py 5 6Copyright Martin J. Bligh, Andy Whitcroft 2007 7""" 8 9import getpass, os, sys, re, stat, tempfile, time, select, subprocess, traceback 10import shutil, warnings 11from autotest_lib.client.bin import fd_stack, sysinfo 12from autotest_lib.client.common_lib import error, log, utils, packages 13from autotest_lib.server import test, subcommand, profilers 14from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 15 16 17def _control_segment_path(name): 18 """Get the pathname of the named control segment file.""" 19 server_dir = os.path.dirname(os.path.abspath(__file__)) 20 return os.path.join(server_dir, "control_segments", name) 21 22 23CLIENT_CONTROL_FILENAME = 'control' 24SERVER_CONTROL_FILENAME = 'control.srv' 25MACHINES_FILENAME = '.machines' 26 27CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 28CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 29CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 30INSTALL_CONTROL_FILE = _control_segment_path('install') 31CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 32 33VERIFY_CONTROL_FILE = _control_segment_path('verify') 34REPAIR_CONTROL_FILE = _control_segment_path('repair') 35 36 37# by default provide a stub that generates no site data 38def _get_site_job_data_dummy(job): 39 return {} 40 41 42# load up site-specific code for generating site-specific job data 43get_site_job_data = utils.import_site_function(__file__, 44 "autotest_lib.server.site_job", "get_site_job_data", 45 _get_site_job_data_dummy) 46 47 48class base_server_job(object): 49 """ 50 The actual job against which we do everything. 51 52 Properties: 53 autodir 54 The top level autotest directory (/usr/local/autotest). 55 serverdir 56 <autodir>/server/ 57 clientdir 58 <autodir>/client/ 59 conmuxdir 60 <autodir>/conmux/ 61 testdir 62 <autodir>/server/tests/ 63 site_testdir 64 <autodir>/server/site_tests/ 65 control 66 the control file for this job 67 drop_caches_between_iterations 68 drop the pagecache between each iteration 69 """ 70 71 STATUS_VERSION = 1 72 73 74 def __init__(self, control, args, resultdir, label, user, machines, 75 client=False, parse_job='', 76 ssh_user='root', ssh_port=22, ssh_pass=''): 77 """ 78 Server side job object. 79 80 Parameters: 81 control: The control file (pathname of) 82 args: args to pass to the control file 83 resultdir: where to throw the results 84 label: label for the job 85 user: Username for the job (email address) 86 client: True if a client-side control file 87 """ 88 path = os.path.dirname(__file__) 89 self.autodir = os.path.abspath(os.path.join(path, '..')) 90 self.serverdir = os.path.join(self.autodir, 'server') 91 self.testdir = os.path.join(self.serverdir, 'tests') 92 self.site_testdir = os.path.join(self.serverdir, 'site_tests') 93 self.tmpdir = os.path.join(self.serverdir, 'tmp') 94 self.conmuxdir = os.path.join(self.autodir, 'conmux') 95 self.clientdir = os.path.join(self.autodir, 'client') 96 self.toolsdir = os.path.join(self.autodir, 'client/tools') 97 if control: 98 self.control = open(control, 'r').read() 99 self.control = re.sub('\r', '', self.control) 100 else: 101 self.control = '' 102 self.resultdir = resultdir 103 if resultdir: 104 if not os.path.exists(resultdir): 105 os.mkdir(resultdir) 106 self.debugdir = os.path.join(resultdir, 'debug') 107 if not os.path.exists(self.debugdir): 108 os.mkdir(self.debugdir) 109 self.status = os.path.join(resultdir, 'status') 110 else: 111 self.status = None 112 self.label = label 113 self.user = user 114 self.args = args 115 self.machines = machines 116 self.client = client 117 self.record_prefix = '' 118 self.warning_loggers = set() 119 self.ssh_user = ssh_user 120 self.ssh_port = ssh_port 121 self.ssh_pass = ssh_pass 122 self.run_test_cleanup = True 123 self.last_boot_tag = None 124 self.hosts = set() 125 self.drop_caches_between_iterations = False 126 127 self.stdout = fd_stack.fd_stack(1, sys.stdout) 128 self.stderr = fd_stack.fd_stack(2, sys.stderr) 129 130 if resultdir: 131 self.sysinfo = sysinfo.sysinfo(self.resultdir) 132 self.profilers = profilers.profilers(self) 133 134 if not os.access(self.tmpdir, os.W_OK): 135 try: 136 os.makedirs(self.tmpdir, 0700) 137 except os.error, e: 138 # Thrown if the directory already exists, which it may. 139 pass 140 141 if not (os.access(self.tmpdir, os.W_OK) and os.path.isdir(self.tmpdir)): 142 self.tmpdir = os.path.join(tempfile.gettempdir(), 143 'autotest-' + getpass.getuser()) 144 try: 145 os.makedirs(self.tmpdir, 0700) 146 except os.error, e: 147 # Thrown if the directory already exists, which it may. 148 # If the problem was something other than the 149 # directory already existing, this chmod should throw as well 150 # exception. 151 os.chmod(self.tmpdir, stat.S_IRWXU) 152 153 if self.status and os.path.exists(self.status): 154 os.unlink(self.status) 155 job_data = {'label' : label, 'user' : user, 156 'hostname' : ','.join(machines), 157 'status_version' : str(self.STATUS_VERSION), 158 'job_started' : str(int(time.time()))} 159 if self.resultdir: 160 job_data.update(get_site_job_data(self)) 161 utils.write_keyval(self.resultdir, job_data) 162 163 self.parse_job = parse_job 164 if self.parse_job and len(machines) == 1: 165 self.using_parser = True 166 self.init_parser(resultdir) 167 else: 168 self.using_parser = False 169 self.pkgmgr = packages.PackageManager(self.autodir, 170 run_function_dargs={'timeout':600}) 171 self.pkgdir = os.path.join(self.autodir, 'packages') 172 173 self.num_tests_run = 0 174 self.num_tests_failed = 0 175 176 self._register_subcommand_hooks() 177 178 179 def _register_subcommand_hooks(self): 180 """ 181 Register some hooks into the subcommand modules that allow us 182 to properly clean up self.hosts created in forked subprocesses. 183 """ 184 def on_fork(cmd): 185 self._existing_hosts_on_fork = set(self.hosts) 186 def on_join(cmd): 187 new_hosts = self.hosts - self._existing_hosts_on_fork 188 for host in new_hosts: 189 host.close() 190 subcommand.subcommand.register_fork_hook(on_fork) 191 subcommand.subcommand.register_join_hook(on_join) 192 193 194 def init_parser(self, resultdir): 195 """ 196 Start the continuous parsing of resultdir. This sets up 197 the database connection and inserts the basic job object into 198 the database if necessary. 199 """ 200 # redirect parser debugging to .parse.log 201 parse_log = os.path.join(resultdir, '.parse.log') 202 parse_log = open(parse_log, 'w', 0) 203 tko_utils.redirect_parser_debugging(parse_log) 204 # create a job model object and set up the db 205 self.results_db = tko_db.db(autocommit=True) 206 self.parser = status_lib.parser(self.STATUS_VERSION) 207 self.job_model = self.parser.make_job(resultdir) 208 self.parser.start(self.job_model) 209 # check if a job already exists in the db and insert it if 210 # it does not 211 job_idx = self.results_db.find_job(self.parse_job) 212 if job_idx is None: 213 self.results_db.insert_job(self.parse_job, self.job_model) 214 else: 215 machine_idx = self.results_db.lookup_machine(self.job_model.machine) 216 self.job_model.index = job_idx 217 self.job_model.machine_idx = machine_idx 218 219 220 def cleanup_parser(self): 221 """ 222 This should be called after the server job is finished 223 to carry out any remaining cleanup (e.g. flushing any 224 remaining test results to the results db) 225 """ 226 if not self.using_parser: 227 return 228 final_tests = self.parser.end() 229 for test in final_tests: 230 self.__insert_test(test) 231 self.using_parser = False 232 233 234 def verify(self): 235 if not self.machines: 236 raise error.AutoservError('No machines specified to verify') 237 if self.resultdir: 238 os.chdir(self.resultdir) 239 try: 240 namespace = {'machines' : self.machines, 'job' : self, 241 'ssh_user' : self.ssh_user, 242 'ssh_port' : self.ssh_port, 243 'ssh_pass' : self.ssh_pass} 244 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 245 except Exception, e: 246 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc()) 247 self.record('ABORT', None, None, msg) 248 raise 249 250 251 def repair(self, host_protection): 252 if not self.machines: 253 raise error.AutoservError('No machines specified to repair') 254 if self.resultdir: 255 os.chdir(self.resultdir) 256 namespace = {'machines': self.machines, 'job': self, 257 'ssh_user': self.ssh_user, 'ssh_port': self.ssh_port, 258 'ssh_pass': self.ssh_pass, 259 'protection_level': host_protection} 260 # no matter what happens during repair (except if it succeeded in 261 # initiating hardware repair procedure), go on to try to reverify 262 try: 263 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 264 except error.AutoservHardwareRepairRequestedError: 265 raise 266 except Exception, exc: 267 print 'Exception occured during repair' 268 traceback.print_exc() 269 270 self.verify() 271 272 273 def precheck(self): 274 """ 275 perform any additional checks in derived classes. 276 """ 277 pass 278 279 280 def enable_external_logging(self): 281 """ 282 Start or restart external logging mechanism. 283 """ 284 pass 285 286 287 def disable_external_logging(self): 288 """ 289 Pause or stop external logging mechanism. 290 """ 291 pass 292 293 294 def enable_test_cleanup(self): 295 """ 296 By default tests run test.cleanup 297 """ 298 self.run_test_cleanup = True 299 300 301 def disable_test_cleanup(self): 302 """ 303 By default tests do not run test.cleanup 304 """ 305 self.run_test_cleanup = False 306 307 308 def use_external_logging(self): 309 """ 310 Return True if external logging should be used. 311 """ 312 return False 313 314 315 def parallel_simple(self, function, machines, log=True, timeout=None): 316 """ 317 Run 'function' using parallel_simple, with an extra wrapper to handle 318 the necessary setup for continuous parsing, if possible. If continuous 319 parsing is already properly initialized then this should just work. 320 """ 321 is_forking = not (len(machines) == 1 and self.machines == machines) 322 if self.parse_job and is_forking and log: 323 def wrapper(machine): 324 self.parse_job += "/" + machine 325 self.using_parser = True 326 self.machines = [machine] 327 self.resultdir = os.path.join(self.resultdir, machine) 328 os.chdir(self.resultdir) 329 utils.write_keyval(self.resultdir, {"hostname": machine}) 330 self.init_parser(self.resultdir) 331 result = function(machine) 332 self.cleanup_parser() 333 return result 334 elif len(machines) > 1 and log: 335 def wrapper(machine): 336 self.resultdir = os.path.join(self.resultdir, machine) 337 os.chdir(self.resultdir) 338 result = function(machine) 339 return result 340 else: 341 wrapper = function 342 subcommand.parallel_simple(wrapper, machines, log, timeout) 343 344 345 def run(self, cleanup=False, install_before=False, install_after=False, 346 collect_crashdumps=True, namespace={}): 347 # use a copy so changes don't affect the original dictionary 348 namespace = namespace.copy() 349 machines = self.machines 350 351 self.aborted = False 352 namespace['machines'] = machines 353 namespace['args'] = self.args 354 namespace['job'] = self 355 namespace['ssh_user'] = self.ssh_user 356 namespace['ssh_port'] = self.ssh_port 357 namespace['ssh_pass'] = self.ssh_pass 358 test_start_time = int(time.time()) 359 360 if self.resultdir: 361 os.chdir(self.resultdir) 362 363 self.enable_external_logging() 364 status_log = os.path.join(self.resultdir, 'status.log') 365 collect_crashinfo = True 366 temp_control_file_dir = None 367 try: 368 if install_before and machines: 369 self._execute_code(INSTALL_CONTROL_FILE, namespace) 370 if self.resultdir: 371 server_control_file = SERVER_CONTROL_FILENAME 372 client_control_file = CLIENT_CONTROL_FILENAME 373 else: 374 temp_control_file_dir = tempfile.mkdtemp() 375 server_control_file = os.path.join(temp_control_file_dir, 376 SERVER_CONTROL_FILENAME) 377 client_control_file = os.path.join(temp_control_file_dir, 378 CLIENT_CONTROL_FILENAME) 379 if self.client: 380 namespace['control'] = self.control 381 utils.open_write_close(client_control_file, self.control) 382 shutil.copy(CLIENT_WRAPPER_CONTROL_FILE, server_control_file) 383 else: 384 namespace['utils'] = utils 385 utils.open_write_close(server_control_file, self.control) 386 self._execute_code(server_control_file, namespace) 387 388 # disable crashinfo collection if we get this far without error 389 collect_crashinfo = False 390 finally: 391 if temp_control_file_dir: 392 # Clean up temp. directory used for copies of the control files. 393 try: 394 shutil.rmtree(temp_control_file_dir) 395 except Exception, e: 396 print 'Error', e, 'removing dir', temp_control_file_dir 397 if machines and (collect_crashdumps or collect_crashinfo): 398 namespace['test_start_time'] = test_start_time 399 if collect_crashinfo: 400 # includes crashdumps 401 self._execute_code(CRASHINFO_CONTROL_FILE, namespace) 402 else: 403 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace) 404 self.disable_external_logging() 405 if cleanup and machines: 406 self._execute_code(CLEANUP_CONTROL_FILE, namespace) 407 if install_after and machines: 408 self._execute_code(INSTALL_CONTROL_FILE, namespace) 409 410 411 def run_test(self, url, *args, **dargs): 412 """ 413 Summon a test object and run it. 414 415 tag 416 tag to add to testname 417 url 418 url of the test to run 419 """ 420 421 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 422 423 tag = dargs.pop('tag', None) 424 if tag: 425 testname += '.' + str(tag) 426 subdir = testname 427 428 outputdir = os.path.join(self.resultdir, subdir) 429 if os.path.exists(outputdir): 430 msg = ("%s already exists, test <%s> may have" 431 " already run with tag <%s>" % (outputdir, testname, tag)) 432 raise error.TestError(msg) 433 os.mkdir(outputdir) 434 435 def group_func(): 436 try: 437 test.runtest(self, url, tag, args, dargs) 438 except error.TestBaseException, e: 439 self.record(e.exit_status, subdir, testname, str(e)) 440 raise 441 except Exception, e: 442 info = str(e) + "\n" + traceback.format_exc() 443 self.record('FAIL', subdir, testname, info) 444 raise 445 else: 446 self.record('GOOD', subdir, testname, 'completed successfully') 447 448 result, exc_info = self._run_group(testname, subdir, group_func) 449 if exc_info and isinstance(exc_info[1], error.TestBaseException): 450 return False 451 elif exc_info: 452 raise exc_info[0], exc_info[1], exc_info[2] 453 else: 454 return True 455 456 457 def _run_group(self, name, subdir, function, *args, **dargs): 458 """\ 459 Underlying method for running something inside of a group. 460 """ 461 result, exc_info = None, None 462 old_record_prefix = self.record_prefix 463 try: 464 self.record('START', subdir, name) 465 self.record_prefix += '\t' 466 try: 467 result = function(*args, **dargs) 468 finally: 469 self.record_prefix = old_record_prefix 470 except error.TestBaseException, e: 471 self.record("END %s" % e.exit_status, subdir, name) 472 exc_info = sys.exc_info() 473 except Exception, e: 474 err_msg = str(e) + '\n' 475 err_msg += traceback.format_exc() 476 self.record('END ABORT', subdir, name, err_msg) 477 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 478 else: 479 self.record('END GOOD', subdir, name) 480 481 return result, exc_info 482 483 484 def run_group(self, function, *args, **dargs): 485 """\ 486 function: 487 subroutine to run 488 *args: 489 arguments for the function 490 """ 491 492 name = function.__name__ 493 494 # Allow the tag for the group to be specified. 495 tag = dargs.pop('tag', None) 496 if tag: 497 name = tag 498 499 return self._run_group(name, None, function, *args, **dargs)[0] 500 501 502 def run_reboot(self, reboot_func, get_kernel_func): 503 """\ 504 A specialization of run_group meant specifically for handling 505 a reboot. Includes support for capturing the kernel version 506 after the reboot. 507 508 reboot_func: a function that carries out the reboot 509 510 get_kernel_func: a function that returns a string 511 representing the kernel version. 512 """ 513 514 old_record_prefix = self.record_prefix 515 try: 516 self.record('START', None, 'reboot') 517 self.record_prefix += '\t' 518 reboot_func() 519 except Exception, e: 520 self.record_prefix = old_record_prefix 521 err_msg = str(e) + '\n' + traceback.format_exc() 522 self.record('END FAIL', None, 'reboot', err_msg) 523 else: 524 kernel = get_kernel_func() 525 self.record_prefix = old_record_prefix 526 self.record('END GOOD', None, 'reboot', 527 optional_fields={"kernel": kernel}) 528 529 530 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 531 self._add_sysinfo_loggable(sysinfo.command(command, logfile), 532 on_every_test) 533 534 535 def add_sysinfo_logfile(self, file, on_every_test=False): 536 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 537 538 539 def _add_sysinfo_loggable(self, loggable, on_every_test): 540 if on_every_test: 541 self.sysinfo.test_loggables.add(loggable) 542 else: 543 self.sysinfo.boot_loggables.add(loggable) 544 545 546 def record(self, status_code, subdir, operation, status='', 547 optional_fields=None): 548 """ 549 Record job-level status 550 551 The intent is to make this file both machine parseable and 552 human readable. That involves a little more complexity, but 553 really isn't all that bad ;-) 554 555 Format is <status code>\t<subdir>\t<operation>\t<status> 556 557 status code: see common_lib.log.is_valid_status() 558 for valid status definition 559 560 subdir: MUST be a relevant subdirectory in the results, 561 or None, which will be represented as '----' 562 563 operation: description of what you ran (e.g. "dbench", or 564 "mkfs -t foobar /dev/sda9") 565 566 status: error message or "completed sucessfully" 567 568 ------------------------------------------------------------ 569 570 Initial tabs indicate indent levels for grouping, and is 571 governed by self.record_prefix 572 573 multiline messages have secondary lines prefaced by a double 574 space (' ') 575 576 Executing this method will trigger the logging of all new 577 warnings to date from the various console loggers. 578 """ 579 # poll all our warning loggers for new warnings 580 warnings = self._read_warnings() 581 for timestamp, msg in warnings: 582 self._record("WARN", None, None, msg, timestamp) 583 584 # write out the actual status log line 585 self._record(status_code, subdir, operation, status, 586 optional_fields=optional_fields) 587 588 589 def _read_warnings(self): 590 warnings = [] 591 while True: 592 # pull in a line of output from every logger that has 593 # output ready to be read 594 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 595 closed_loggers = set() 596 for logger in loggers: 597 line = logger.readline() 598 # record any broken pipes (aka line == empty) 599 if len(line) == 0: 600 closed_loggers.add(logger) 601 continue 602 timestamp, msg = line.split('\t', 1) 603 warnings.append((int(timestamp), msg.strip())) 604 605 # stop listening to loggers that are closed 606 self.warning_loggers -= closed_loggers 607 608 # stop if none of the loggers have any output left 609 if not loggers: 610 break 611 612 # sort into timestamp order 613 warnings.sort() 614 return warnings 615 616 617 def _render_record(self, status_code, subdir, operation, status='', 618 epoch_time=None, record_prefix=None, 619 optional_fields=None): 620 """ 621 Internal Function to generate a record to be written into a 622 status log. For use by server_job.* classes only. 623 """ 624 if subdir: 625 if re.match(r'[\n\t]', subdir): 626 raise ValueError('Invalid character in subdir string') 627 substr = subdir 628 else: 629 substr = '----' 630 631 if not log.is_valid_status(status_code): 632 raise ValueError('Invalid status code supplied: %s' % status_code) 633 if not operation: 634 operation = '----' 635 if re.match(r'[\n\t]', operation): 636 raise ValueError('Invalid character in operation string') 637 operation = operation.rstrip() 638 status = status.rstrip() 639 status = re.sub(r"\t", " ", status) 640 # Ensure any continuation lines are marked so we can 641 # detect them in the status file to ensure it is parsable. 642 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 643 644 if not optional_fields: 645 optional_fields = {} 646 647 # Generate timestamps for inclusion in the logs 648 if epoch_time is None: 649 epoch_time = int(time.time()) 650 local_time = time.localtime(epoch_time) 651 optional_fields["timestamp"] = str(epoch_time) 652 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 653 local_time) 654 655 fields = [status_code, substr, operation] 656 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 657 fields.append(status) 658 659 if record_prefix is None: 660 record_prefix = self.record_prefix 661 662 msg = '\t'.join(str(x) for x in fields) 663 return record_prefix + msg + '\n' 664 665 666 def _record_prerendered(self, msg): 667 """ 668 Record a pre-rendered msg into the status logs. The only 669 change this makes to the message is to add on the local 670 indentation. Should not be called outside of server_job.* 671 classes. Unlike _record, this does not write the message 672 to standard output. 673 """ 674 lines = [] 675 status_file = os.path.join(self.resultdir, 'status.log') 676 status_log = open(status_file, 'a') 677 for line in msg.splitlines(): 678 line = self.record_prefix + line + '\n' 679 lines.append(line) 680 status_log.write(line) 681 status_log.close() 682 self.__parse_status(lines) 683 684 685 def _fill_server_control_namespace(self, namespace, protect=True): 686 """ 687 Prepare a namespace to be used when executing server control files. 688 689 This sets up the control file API by importing modules and making them 690 available under the appropriate names within namespace. 691 692 For use by _execute_code(). 693 694 Args: 695 namespace: The namespace dictionary to fill in. 696 protect: Boolean. If True (the default) any operation that would 697 clobber an existing entry in namespace will cause an error. 698 Raises: 699 error.AutoservError: When a name would be clobbered by import. 700 """ 701 def _import_names(module_name, names=()): 702 """ 703 Import a module and assign named attributes into namespace. 704 705 Args: 706 module_name: The string module name. 707 names: A limiting list of names to import from module_name. If 708 empty (the default), all names are imported from the module 709 similar to a "from foo.bar import *" statement. 710 Raises: 711 error.AutoservError: When a name being imported would clobber 712 a name already in namespace. 713 """ 714 module = __import__(module_name, {}, {}, names) 715 716 # No names supplied? Import * from the lowest level module. 717 # (Ugh, why do I have to implement this part myself?) 718 if not names: 719 for submodule_name in module_name.split('.')[1:]: 720 module = getattr(module, submodule_name) 721 if hasattr(module, '__all__'): 722 names = getattr(module, '__all__') 723 else: 724 names = dir(module) 725 726 # Install each name into namespace, checking to make sure it 727 # doesn't override anything that already exists. 728 for name in names: 729 # Check for conflicts to help prevent future problems. 730 if name in namespace and protect: 731 if namespace[name] is not getattr(module, name): 732 raise error.AutoservError('importing name ' 733 '%s from %s %r would override %r' % 734 (name, module_name, getattr(module, name), 735 namespace[name])) 736 else: 737 # Encourage cleanliness and the use of __all__ for a 738 # more concrete API with less surprises on '*' imports. 739 warnings.warn('%s (%r) being imported from %s for use ' 740 'in server control files is not the ' 741 'first occurrance of that import.' % 742 (name, namespace[name], module_name)) 743 744 namespace[name] = getattr(module, name) 745 746 747 # This is the equivalent of prepending a bunch of import statements to 748 # the front of the control script. 749 namespace.update(os=os, sys=sys) 750 _import_names('autotest_lib.server', 751 ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler', 752 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel')) 753 _import_names('autotest_lib.server.subcommand', 754 ('parallel', 'parallel_simple', 'subcommand')) 755 _import_names('autotest_lib.server.utils', 756 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 757 _import_names('autotest_lib.client.common_lib.error') 758 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 759 760 # Inject ourself as the job object into other classes within the API. 761 # (Yuck, this injection is a gross thing be part of a public API. -gps) 762 # 763 # XXX Base & SiteAutotest do not appear to use .job. Who does? 764 namespace['autotest'].Autotest.job = self 765 # server.hosts.base_classes.Host uses .job. 766 namespace['hosts'].Host.job = self 767 768 769 def _execute_code(self, code_file, namespace, protect=True): 770 """ 771 Execute code using a copy of namespace as a server control script. 772 773 Unless protect_namespace is explicitly set to False, the dict will not 774 be modified. 775 776 Args: 777 code_file: The filename of the control file to execute. 778 namespace: A dict containing names to make available during execution. 779 protect: Boolean. If True (the default) a copy of the namespace dict 780 is used during execution to prevent the code from modifying its 781 contents outside of this function. If False the raw dict is 782 passed in and modifications will be allowed. 783 """ 784 if protect: 785 namespace = namespace.copy() 786 self._fill_server_control_namespace(namespace, protect=protect) 787 # TODO: Simplify and get rid of the special cases for only 1 machine. 788 if len(self.machines) > 1: 789 machines_text = '\n'.join(self.machines) + '\n' 790 # Only rewrite the file if it does not match our machine list. 791 try: 792 machines_f = open(MACHINES_FILENAME, 'r') 793 existing_machines_text = machines_f.read() 794 machines_f.close() 795 except EnvironmentError: 796 existing_machines_text = None 797 if machines_text != existing_machines_text: 798 utils.open_write_close(MACHINES_FILENAME, machines_text) 799 execfile(code_file, namespace, namespace) 800 801 802 def _record(self, status_code, subdir, operation, status='', 803 epoch_time=None, optional_fields=None): 804 """ 805 Actual function for recording a single line into the status 806 logs. Should never be called directly, only by job.record as 807 this would bypass the console monitor logging. 808 """ 809 810 msg = self._render_record(status_code, subdir, operation, status, 811 epoch_time, optional_fields=optional_fields) 812 813 814 status_file = os.path.join(self.resultdir, 'status.log') 815 sys.stdout.write(msg) 816 open(status_file, "a").write(msg) 817 if subdir: 818 test_dir = os.path.join(self.resultdir, subdir) 819 status_file = os.path.join(test_dir, 'status.log') 820 open(status_file, "a").write(msg) 821 self.__parse_status(msg.splitlines()) 822 823 824 def __parse_status(self, new_lines): 825 if not self.using_parser: 826 return 827 new_tests = self.parser.process_lines(new_lines) 828 for test in new_tests: 829 self.__insert_test(test) 830 831 832 def __insert_test(self, test): 833 """ 834 An internal method to insert a new test result into the 835 database. This method will not raise an exception, even if an 836 error occurs during the insert, to avoid failing a test 837 simply because of unexpected database issues.""" 838 self.num_tests_run += 1 839 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'): 840 self.num_tests_failed += 1 841 try: 842 self.results_db.insert_test(self.job_model, test) 843 except Exception: 844 msg = ("WARNING: An unexpected error occured while " 845 "inserting test results into the database. " 846 "Ignoring error.\n" + traceback.format_exc()) 847 print >> sys.stderr, msg 848 849 850site_server_job = utils.import_site_class( 851 __file__, "autotest_lib.server.site_server_job", "site_server_job", 852 base_server_job) 853 854class server_job(site_server_job, base_server_job): 855 pass 856