1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, re, shutil, sys, time, traceback, types, glob 9import logging, getpass, weakref 10from autotest_lib.client.bin import client_logging_config 11from autotest_lib.client.bin import utils, parallel, kernel, xen 12from autotest_lib.client.bin import profilers, boottool, harness 13from autotest_lib.client.bin import config, sysinfo, test, local_host 14from autotest_lib.client.bin import partition as partition_lib 15from autotest_lib.client.common_lib import base_job 16from autotest_lib.client.common_lib import error, barrier, logging_manager 17from autotest_lib.client.common_lib import base_packages, packages 18from autotest_lib.client.common_lib import global_config 19from autotest_lib.client.tools import html_report 20 21GLOBAL_CONFIG = global_config.global_config 22 23LAST_BOOT_TAG = object() 24JOB_PREAMBLE = """ 25from autotest_lib.client.common_lib.error import * 26from autotest_lib.client.bin.utils import * 27""" 28 29 30class StepError(error.AutotestError): 31 pass 32 33class NotAvailableError(error.AutotestError): 34 pass 35 36 37 38def _run_test_complete_on_exit(f): 39 """Decorator for job methods that automatically calls 40 self.harness.run_test_complete when the method exits, if appropriate.""" 41 def wrapped(self, *args, **dargs): 42 try: 43 return f(self, *args, **dargs) 44 finally: 45 if self._logger.global_filename == 'status': 46 self.harness.run_test_complete() 47 if self.drop_caches: 48 utils.drop_caches() 49 wrapped.__name__ = f.__name__ 50 wrapped.__doc__ = f.__doc__ 51 wrapped.__dict__.update(f.__dict__) 52 return wrapped 53 54 55class status_indenter(base_job.status_indenter): 56 """Provide a status indenter that is backed by job._record_prefix.""" 57 def __init__(self, job): 58 self.job = weakref.proxy(job) # avoid a circular reference 59 60 61 @property 62 def indent(self): 63 return self.job._record_indent 64 65 66 def increment(self): 67 self.job._record_indent += 1 68 69 70 def decrement(self): 71 self.job._record_indent -= 1 72 73 74class base_client_job(base_job.base_job): 75 """The client-side concrete implementation of base_job. 76 77 Optional properties provided by this implementation: 78 control 79 bootloader 80 harness 81 """ 82 83 _WARNING_DISABLE_DELAY = 5 84 85 # _record_indent is a persistent property, but only on the client 86 _job_state = base_job.base_job._job_state 87 _record_indent = _job_state.property_factory( 88 '_state', '_record_indent', 0, namespace='client') 89 _max_disk_usage_rate = _job_state.property_factory( 90 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 91 92 93 def __init__(self, control, options, drop_caches=True, 94 extra_copy_cmdline=None): 95 """ 96 Prepare a client side job object. 97 98 @param control: The control file (pathname of). 99 @param options: an object which includes: 100 jobtag: The job tag string (eg "default"). 101 cont: If this is the continuation of this job. 102 harness_type: An alternative server harness. [None] 103 use_external_logging: If true, the enable_external_logging 104 method will be called during construction. [False] 105 @param drop_caches: If true, utils.drop_caches() is called before and 106 between all tests. [True] 107 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 108 copy from the running kernel to all the installed kernels with 109 this job 110 """ 111 super(base_client_job, self).__init__(options=options) 112 self._pre_record_init(control, options) 113 try: 114 self._post_record_init(control, options, drop_caches, 115 extra_copy_cmdline) 116 except Exception, err: 117 self.record( 118 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 119 str(err)) 120 raise 121 122 123 @classmethod 124 def _get_environ_autodir(cls): 125 return os.environ['AUTODIR'] 126 127 128 @classmethod 129 def _find_base_directories(cls): 130 """ 131 Determine locations of autodir and clientdir (which are the same) 132 using os.environ. Serverdir does not exist in this context. 133 """ 134 autodir = clientdir = cls._get_environ_autodir() 135 return autodir, clientdir, None 136 137 138 @classmethod 139 def _parse_args(cls, args): 140 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 141 142 143 def _find_resultdir(self, options): 144 """ 145 Determine the directory for storing results. On a client this is 146 always <autodir>/results/<tag>, where tag is passed in on the command 147 line as an option. 148 """ 149 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT', 150 'output_dir', 151 default="") 152 if options.output_dir: 153 basedir = options.output_dir 154 elif output_dir_config: 155 basedir = output_dir_config 156 else: 157 basedir = self.autodir 158 159 return os.path.join(basedir, 'results', options.tag) 160 161 162 def _get_status_logger(self): 163 """Return a reference to the status logger.""" 164 return self._logger 165 166 167 def _pre_record_init(self, control, options): 168 """ 169 Initialization function that should peform ONLY the required 170 setup so that the self.record() method works. 171 172 As of now self.record() needs self.resultdir, self._group_level, 173 self.harness and of course self._logger. 174 """ 175 if not options.cont: 176 self._cleanup_debugdir_files() 177 self._cleanup_results_dir() 178 179 logging_manager.configure_logging( 180 client_logging_config.ClientLoggingConfig(), 181 results_dir=self.resultdir, 182 verbose=options.verbose) 183 logging.info('Writing results to %s', self.resultdir) 184 185 # init_group_level needs the state 186 self.control = os.path.realpath(control) 187 self._is_continuation = options.cont 188 self._current_step_ancestry = [] 189 self._next_step_index = 0 190 self._load_state() 191 192 _harness = self.handle_persistent_option(options, 'harness') 193 _harness_args = self.handle_persistent_option(options, 'harness_args') 194 195 self.harness = harness.select(_harness, self, _harness_args) 196 197 # set up the status logger 198 def client_job_record_hook(entry): 199 msg_tag = '' 200 if '.' in self._logger.global_filename: 201 msg_tag = self._logger.global_filename.split('.', 1)[1] 202 # send the entry to the job harness 203 message = '\n'.join([entry.message] + entry.extra_message_lines) 204 rendered_entry = self._logger.render_entry(entry) 205 self.harness.test_status_detail(entry.status_code, entry.subdir, 206 entry.operation, message, msg_tag, 207 entry.fields) 208 self.harness.test_status(rendered_entry, msg_tag) 209 # send the entry to stdout, if it's enabled 210 logging.info(rendered_entry) 211 self._logger = base_job.status_logger( 212 self, status_indenter(self), record_hook=client_job_record_hook, 213 tap_writer=self._tap) 214 215 def _post_record_init(self, control, options, drop_caches, 216 extra_copy_cmdline): 217 """ 218 Perform job initialization not required by self.record(). 219 """ 220 self._init_drop_caches(drop_caches) 221 222 self._init_packages() 223 224 self.sysinfo = sysinfo.sysinfo(self.resultdir) 225 self._load_sysinfo_state() 226 227 if not options.cont: 228 download = os.path.join(self.testdir, 'download') 229 if not os.path.exists(download): 230 os.mkdir(download) 231 232 shutil.copyfile(self.control, 233 os.path.join(self.resultdir, 'control')) 234 235 self.control = control 236 237 self.logging = logging_manager.get_logging_manager( 238 manage_stdout_and_stderr=True, redirect_fds=True) 239 self.logging.start_logging() 240 241 self._config = config.config(self) 242 self.profilers = profilers.profilers(self) 243 244 self._init_bootloader() 245 246 self.machines = [options.hostname] 247 self.machine_dict_list = [{'hostname' : options.hostname}] 248 # Client side tests should always run the same whether or not they are 249 # running in the lab. 250 self.in_lab = False 251 self.hosts = set([local_host.LocalHost(hostname=options.hostname, 252 bootloader=self.bootloader)]) 253 254 self.args = [] 255 if options.args: 256 self.args = self._parse_args(options.args) 257 258 if options.user: 259 self.user = options.user 260 else: 261 self.user = getpass.getuser() 262 263 self.sysinfo.log_per_reboot_data() 264 265 if not options.cont: 266 self.record('START', None, None) 267 268 self.harness.run_start() 269 270 if options.log: 271 self.enable_external_logging() 272 273 self._init_cmdline(extra_copy_cmdline) 274 275 self.num_tests_run = None 276 self.num_tests_failed = None 277 278 self.warning_loggers = None 279 self.warning_manager = None 280 281 282 def _init_drop_caches(self, drop_caches): 283 """ 284 Perform the drop caches initialization. 285 """ 286 self.drop_caches_between_iterations = ( 287 GLOBAL_CONFIG.get_config_value('CLIENT', 288 'drop_caches_between_iterations', 289 type=bool, default=True)) 290 self.drop_caches = drop_caches 291 if self.drop_caches: 292 utils.drop_caches() 293 294 295 def _init_bootloader(self): 296 """ 297 Perform boottool initialization. 298 """ 299 tool = self.config_get('boottool.executable') 300 self.bootloader = boottool.boottool(tool) 301 302 303 def _init_packages(self): 304 """ 305 Perform the packages support initialization. 306 """ 307 self.pkgmgr = packages.PackageManager( 308 self.autodir, run_function_dargs={'timeout':3600}) 309 310 311 def _init_cmdline(self, extra_copy_cmdline): 312 """ 313 Initialize default cmdline for booted kernels in this job. 314 """ 315 copy_cmdline = set(['console']) 316 if extra_copy_cmdline is not None: 317 copy_cmdline.update(extra_copy_cmdline) 318 319 # extract console= and other args from cmdline and add them into the 320 # base args that we use for all kernels we install 321 cmdline = utils.read_one_line('/proc/cmdline') 322 kernel_args = [] 323 for karg in cmdline.split(): 324 for param in copy_cmdline: 325 if karg.startswith(param) and \ 326 (len(param) == len(karg) or karg[len(param)] == '='): 327 kernel_args.append(karg) 328 self.config_set('boot.default_args', ' '.join(kernel_args)) 329 330 331 def _cleanup_results_dir(self): 332 """Delete everything in resultsdir""" 333 assert os.path.exists(self.resultdir) 334 list_files = glob.glob('%s/*' % self.resultdir) 335 for f in list_files: 336 if os.path.isdir(f): 337 shutil.rmtree(f) 338 elif os.path.isfile(f): 339 os.remove(f) 340 341 342 def _cleanup_debugdir_files(self): 343 """ 344 Delete any leftover debugdir files 345 """ 346 list_files = glob.glob("/tmp/autotest_results_dir.*") 347 for f in list_files: 348 os.remove(f) 349 350 351 def disable_warnings(self, warning_type): 352 self.record("INFO", None, None, 353 "disabling %s warnings" % warning_type, 354 {"warnings.disable": warning_type}) 355 time.sleep(self._WARNING_DISABLE_DELAY) 356 357 358 def enable_warnings(self, warning_type): 359 time.sleep(self._WARNING_DISABLE_DELAY) 360 self.record("INFO", None, None, 361 "enabling %s warnings" % warning_type, 362 {"warnings.enable": warning_type}) 363 364 365 def monitor_disk_usage(self, max_rate): 366 """\ 367 Signal that the job should monitor disk space usage on / 368 and generate a warning if a test uses up disk space at a 369 rate exceeding 'max_rate'. 370 371 Parameters: 372 max_rate - the maximium allowed rate of disk consumption 373 during a test, in MB/hour, or 0 to indicate 374 no limit. 375 """ 376 self._max_disk_usage_rate = max_rate 377 378 379 def relative_path(self, path): 380 """\ 381 Return a patch relative to the job results directory 382 """ 383 head = len(self.resultdir) + 1 # remove the / inbetween 384 return path[head:] 385 386 387 def control_get(self): 388 return self.control 389 390 391 def control_set(self, control): 392 self.control = os.path.abspath(control) 393 394 395 def harness_select(self, which, harness_args): 396 self.harness = harness.select(which, self, harness_args) 397 398 399 def config_set(self, name, value): 400 self._config.set(name, value) 401 402 403 def config_get(self, name): 404 return self._config.get(name) 405 406 407 def setup_dirs(self, results_dir, tmp_dir): 408 if not tmp_dir: 409 tmp_dir = os.path.join(self.tmpdir, 'build') 410 if not os.path.exists(tmp_dir): 411 os.mkdir(tmp_dir) 412 if not os.path.isdir(tmp_dir): 413 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 414 raise ValueError(e_msg) 415 416 # We label the first build "build" and then subsequent ones 417 # as "build.2", "build.3", etc. Whilst this is a little bit 418 # inconsistent, 99.9% of jobs will only have one build 419 # (that's not done as kernbench, sparse, or buildtest), 420 # so it works out much cleaner. One of life's comprimises. 421 if not results_dir: 422 results_dir = os.path.join(self.resultdir, 'build') 423 i = 2 424 while os.path.exists(results_dir): 425 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 426 i += 1 427 if not os.path.exists(results_dir): 428 os.mkdir(results_dir) 429 430 return (results_dir, tmp_dir) 431 432 433 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 434 kjob = None ): 435 """Summon a xen object""" 436 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 437 build_dir = 'xen' 438 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 439 leave, kjob) 440 441 442 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 443 """Summon a kernel object""" 444 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 445 build_dir = 'linux' 446 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 447 build_dir, leave) 448 449 450 def barrier(self, *args, **kwds): 451 """Create a barrier object""" 452 return barrier.barrier(*args, **kwds) 453 454 455 def install_pkg(self, name, pkg_type, install_dir): 456 ''' 457 This method is a simple wrapper around the actual package 458 installation method in the Packager class. This is used 459 internally by the profilers, deps and tests code. 460 name : name of the package (ex: sleeptest, dbench etc.) 461 pkg_type : Type of the package (ex: test, dep etc.) 462 install_dir : The directory in which the source is actually 463 untarred into. (ex: client/profilers/<name> for profilers) 464 ''' 465 if self.pkgmgr.repositories: 466 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 467 468 469 def add_repository(self, repo_urls): 470 ''' 471 Adds the repository locations to the job so that packages 472 can be fetched from them when needed. The repository list 473 needs to be a string list 474 Ex: job.add_repository(['http://blah1','http://blah2']) 475 ''' 476 for repo_url in repo_urls: 477 self.pkgmgr.add_repository(repo_url) 478 479 # Fetch the packages' checksum file that contains the checksums 480 # of all the packages if it is not already fetched. The checksum 481 # is always fetched whenever a job is first started. This 482 # is not done in the job's constructor as we don't have the list of 483 # the repositories there (and obviously don't care about this file 484 # if we are not using the repos) 485 try: 486 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 487 base_packages.CHECKSUM_FILE) 488 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 489 checksum_file_path, use_checksum=False) 490 except error.PackageFetchError: 491 # packaging system might not be working in this case 492 # Silently fall back to the normal case 493 pass 494 495 496 def require_gcc(self): 497 """ 498 Test whether gcc is installed on the machine. 499 """ 500 # check if gcc is installed on the system. 501 try: 502 utils.system('which gcc') 503 except error.CmdError, e: 504 raise NotAvailableError('gcc is required by this job and is ' 505 'not available on the system') 506 507 508 def setup_dep(self, deps): 509 """Set up the dependencies for this test. 510 deps is a list of libraries required for this test. 511 """ 512 # Fetch the deps from the repositories and set them up. 513 for dep in deps: 514 dep_dir = os.path.join(self.autodir, 'deps', dep) 515 # Search for the dependency in the repositories if specified, 516 # else check locally. 517 try: 518 self.install_pkg(dep, 'dep', dep_dir) 519 except error.PackageInstallError: 520 # see if the dep is there locally 521 pass 522 523 # dep_dir might not exist if it is not fetched from the repos 524 if not os.path.exists(dep_dir): 525 raise error.TestError("Dependency %s does not exist" % dep) 526 527 os.chdir(dep_dir) 528 if execfile('%s.py' % dep, {}) is None: 529 logging.info('Dependency %s successfuly built', dep) 530 531 532 def _runtest(self, url, tag, timeout, args, dargs): 533 try: 534 l = lambda : test.runtest(self, url, tag, args, dargs) 535 pid = parallel.fork_start(self.resultdir, l) 536 537 if timeout: 538 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 539 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 540 else: 541 parallel.fork_waitfor(self.resultdir, pid) 542 543 except error.TestBaseException: 544 # These are already classified with an error type (exit_status) 545 raise 546 except error.JobError: 547 raise # Caught further up and turned into an ABORT. 548 except Exception, e: 549 # Converts all other exceptions thrown by the test regardless 550 # of phase into a TestError(TestBaseException) subclass that 551 # reports them with their full stack trace. 552 raise error.UnhandledTestError(e) 553 554 555 def _run_test_base(self, url, *args, **dargs): 556 """ 557 Prepares arguments and run functions to run_test and run_test_detail. 558 559 @param url A url that identifies the test to run. 560 @param tag An optional keyword argument that will be added to the 561 test and subdir name. 562 @param subdir_tag An optional keyword argument that will be added 563 to the subdir name. 564 565 @returns: 566 subdir: Test subdirectory 567 testname: Test name 568 group_func: Actual test run function 569 timeout: Test timeout 570 """ 571 group, testname = self.pkgmgr.get_package_name(url, 'test') 572 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 573 outputdir = self._make_test_outputdir(subdir) 574 575 timeout = dargs.pop('timeout', None) 576 if timeout: 577 logging.debug('Test has timeout: %d sec.', timeout) 578 579 def log_warning(reason): 580 self.record("WARN", subdir, testname, reason) 581 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 582 def group_func(): 583 try: 584 self._runtest(url, tag, timeout, args, dargs) 585 except error.TestBaseException, detail: 586 # The error is already classified, record it properly. 587 self.record(detail.exit_status, subdir, testname, str(detail)) 588 raise 589 else: 590 self.record('GOOD', subdir, testname, 'completed successfully') 591 592 return (subdir, testname, group_func, timeout) 593 594 595 @_run_test_complete_on_exit 596 def run_test(self, url, *args, **dargs): 597 """ 598 Summon a test object and run it. 599 600 @param url A url that identifies the test to run. 601 @param tag An optional keyword argument that will be added to the 602 test and subdir name. 603 @param subdir_tag An optional keyword argument that will be added 604 to the subdir name. 605 606 @returns True if the test passes, False otherwise. 607 """ 608 (subdir, testname, group_func, timeout) = self._run_test_base(url, 609 *args, 610 **dargs) 611 try: 612 self._rungroup(subdir, testname, group_func, timeout) 613 return True 614 except error.TestBaseException: 615 return False 616 # Any other exception here will be given to the caller 617 # 618 # NOTE: The only exception possible from the control file here 619 # is error.JobError as _runtest() turns all others into an 620 # UnhandledTestError that is caught above. 621 622 623 @_run_test_complete_on_exit 624 def run_test_detail(self, url, *args, **dargs): 625 """ 626 Summon a test object and run it, returning test status. 627 628 @param url A url that identifies the test to run. 629 @param tag An optional keyword argument that will be added to the 630 test and subdir name. 631 @param subdir_tag An optional keyword argument that will be added 632 to the subdir name. 633 634 @returns Test status 635 @see: client/common_lib/error.py, exit_status 636 """ 637 (subdir, testname, group_func, timeout) = self._run_test_base(url, 638 *args, 639 **dargs) 640 try: 641 self._rungroup(subdir, testname, group_func, timeout) 642 return 'GOOD' 643 except error.TestBaseException, detail: 644 return detail.exit_status 645 646 647 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 648 """\ 649 subdir: 650 name of the group 651 testname: 652 name of the test to run, or support step 653 function: 654 subroutine to run 655 *args: 656 arguments for the function 657 658 Returns the result of the passed in function 659 """ 660 661 try: 662 optional_fields = None 663 if timeout: 664 optional_fields = {} 665 optional_fields['timeout'] = timeout 666 self.record('START', subdir, testname, 667 optional_fields=optional_fields) 668 669 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 670 try: 671 result = function(*args, **dargs) 672 self.record('END GOOD', subdir, testname) 673 return result 674 except error.TestBaseException, e: 675 self.record('END %s' % e.exit_status, subdir, testname) 676 raise 677 except error.JobError, e: 678 self.record('END ABORT', subdir, testname) 679 raise 680 except Exception, e: 681 # This should only ever happen due to a bug in the given 682 # function's code. The common case of being called by 683 # run_test() will never reach this. If a control file called 684 # run_group() itself, bugs in its function will be caught 685 # here. 686 err_msg = str(e) + '\n' + traceback.format_exc() 687 self.record('END ERROR', subdir, testname, err_msg) 688 raise 689 finally: 690 self._state.discard('client', 'unexpected_reboot') 691 692 693 def run_group(self, function, tag=None, **dargs): 694 """ 695 Run a function nested within a group level. 696 697 function: 698 Callable to run. 699 tag: 700 An optional tag name for the group. If None (default) 701 function.__name__ will be used. 702 **dargs: 703 Named arguments for the function. 704 """ 705 if tag: 706 name = tag 707 else: 708 name = function.__name__ 709 710 try: 711 return self._rungroup(subdir=None, testname=name, 712 function=function, timeout=None, **dargs) 713 except (SystemExit, error.TestBaseException): 714 raise 715 # If there was a different exception, turn it into a TestError. 716 # It will be caught by step_engine or _run_step_fn. 717 except Exception, e: 718 raise error.UnhandledTestError(e) 719 720 721 def cpu_count(self): 722 return utils.count_cpus() # use total system count 723 724 725 def start_reboot(self): 726 self.record('START', None, 'reboot') 727 self.record('GOOD', None, 'reboot.start') 728 729 730 def _record_reboot_failure(self, subdir, operation, status, 731 running_id=None): 732 self.record("ABORT", subdir, operation, status) 733 if not running_id: 734 running_id = utils.running_os_ident() 735 kernel = {"kernel": running_id.split("::")[0]} 736 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 737 738 739 def _check_post_reboot(self, subdir, running_id=None): 740 """ 741 Function to perform post boot checks such as if the system configuration 742 has changed across reboots (specifically, CPUs and partitions). 743 744 @param subdir: The subdir to use in the job.record call. 745 @param running_id: An optional running_id to include in the reboot 746 failure log message 747 748 @raise JobError: Raised if the current configuration does not match the 749 pre-reboot configuration. 750 """ 751 # check to see if any partitions have changed 752 partition_list = partition_lib.get_partition_list(self, 753 exclude_swap=False) 754 mount_info = partition_lib.get_mount_info(partition_list) 755 old_mount_info = self._state.get('client', 'mount_info') 756 if mount_info != old_mount_info: 757 new_entries = mount_info - old_mount_info 758 old_entries = old_mount_info - mount_info 759 description = ("mounted partitions are different after reboot " 760 "(old entries: %s, new entries: %s)" % 761 (old_entries, new_entries)) 762 self._record_reboot_failure(subdir, "reboot.verify_config", 763 description, running_id=running_id) 764 raise error.JobError("Reboot failed: %s" % description) 765 766 # check to see if any CPUs have changed 767 cpu_count = utils.count_cpus() 768 old_count = self._state.get('client', 'cpu_count') 769 if cpu_count != old_count: 770 description = ('Number of CPUs changed after reboot ' 771 '(old count: %d, new count: %d)' % 772 (old_count, cpu_count)) 773 self._record_reboot_failure(subdir, 'reboot.verify_config', 774 description, running_id=running_id) 775 raise error.JobError('Reboot failed: %s' % description) 776 777 778 def end_reboot(self, subdir, kernel, patches, running_id=None): 779 self._check_post_reboot(subdir, running_id=running_id) 780 781 # strip ::<timestamp> from the kernel version if present 782 kernel = kernel.split("::")[0] 783 kernel_info = {"kernel": kernel} 784 for i, patch in enumerate(patches): 785 kernel_info["patch%d" % i] = patch 786 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 787 788 789 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 790 type='src', patches=[]): 791 """ Check the passed kernel identifier against the command line 792 and the running kernel, abort the job on missmatch. """ 793 794 logging.info("POST BOOT: checking booted kernel " 795 "mark=%d identity='%s' type='%s'", 796 expected_when, expected_id, type) 797 798 running_id = utils.running_os_ident() 799 800 cmdline = utils.read_one_line("/proc/cmdline") 801 802 find_sum = re.compile(r'.*IDENT=(\d+)') 803 m = find_sum.match(cmdline) 804 cmdline_when = -1 805 if m: 806 cmdline_when = int(m.groups()[0]) 807 808 # We have all the facts, see if they indicate we 809 # booted the requested kernel or not. 810 bad = False 811 if (type == 'src' and expected_id != running_id or 812 type == 'rpm' and 813 not running_id.startswith(expected_id + '::')): 814 logging.error("Kernel identifier mismatch") 815 bad = True 816 if expected_when != cmdline_when: 817 logging.error("Kernel command line mismatch") 818 bad = True 819 820 if bad: 821 logging.error(" Expected Ident: " + expected_id) 822 logging.error(" Running Ident: " + running_id) 823 logging.error(" Expected Mark: %d", expected_when) 824 logging.error("Command Line Mark: %d", cmdline_when) 825 logging.error(" Command Line: " + cmdline) 826 827 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 828 running_id=running_id) 829 raise error.JobError("Reboot returned with the wrong kernel") 830 831 self.record('GOOD', subdir, 'reboot.verify', 832 utils.running_os_full_version()) 833 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 834 835 836 def partition(self, device, loop_size=0, mountpoint=None): 837 """ 838 Work with a machine partition 839 840 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 841 @param mountpoint: Specify a directory to mount to. If not specified 842 autotest tmp directory will be used. 843 @param loop_size: Size of loopback device (in MB). Defaults to 0. 844 845 @return: A L{client.bin.partition.partition} object 846 """ 847 848 if not mountpoint: 849 mountpoint = self.tmpdir 850 return partition_lib.partition(self, device, loop_size, mountpoint) 851 852 @utils.deprecated 853 def filesystem(self, device, mountpoint=None, loop_size=0): 854 """ Same as partition 855 856 @deprecated: Use partition method instead 857 """ 858 return self.partition(device, loop_size, mountpoint) 859 860 861 def enable_external_logging(self): 862 pass 863 864 865 def disable_external_logging(self): 866 pass 867 868 869 def reboot_setup(self): 870 # save the partition list and mount points, as well as the cpu count 871 partition_list = partition_lib.get_partition_list(self, 872 exclude_swap=False) 873 mount_info = partition_lib.get_mount_info(partition_list) 874 self._state.set('client', 'mount_info', mount_info) 875 self._state.set('client', 'cpu_count', utils.count_cpus()) 876 877 878 def reboot(self, tag=LAST_BOOT_TAG): 879 if tag == LAST_BOOT_TAG: 880 tag = self.last_boot_tag 881 else: 882 self.last_boot_tag = tag 883 884 self.reboot_setup() 885 self.harness.run_reboot() 886 default = self.config_get('boot.set_default') 887 if default: 888 self.bootloader.set_default(tag) 889 else: 890 self.bootloader.boot_once(tag) 891 892 # HACK: using this as a module sometimes hangs shutdown, so if it's 893 # installed unload it first 894 utils.system("modprobe -r netconsole", ignore_status=True) 895 896 # sync first, so that a sync during shutdown doesn't time out 897 utils.system("sync; sync", ignore_status=True) 898 899 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 900 self.quit() 901 902 903 def noop(self, text): 904 logging.info("job: noop: " + text) 905 906 907 @_run_test_complete_on_exit 908 def parallel(self, *tasklist): 909 """Run tasks in parallel""" 910 911 pids = [] 912 old_log_filename = self._logger.global_filename 913 for i, task in enumerate(tasklist): 914 assert isinstance(task, (tuple, list)) 915 self._logger.global_filename = old_log_filename + (".%d" % i) 916 def task_func(): 917 # stub out _record_indent with a process-local one 918 base_record_indent = self._record_indent 919 proc_local = self._job_state.property_factory( 920 '_state', '_record_indent.%d' % os.getpid(), 921 base_record_indent, namespace='client') 922 self.__class__._record_indent = proc_local 923 task[0](*task[1:]) 924 pids.append(parallel.fork_start(self.resultdir, task_func)) 925 926 old_log_path = os.path.join(self.resultdir, old_log_filename) 927 old_log = open(old_log_path, "a") 928 exceptions = [] 929 for i, pid in enumerate(pids): 930 # wait for the task to finish 931 try: 932 parallel.fork_waitfor(self.resultdir, pid) 933 except Exception, e: 934 exceptions.append(e) 935 # copy the logs from the subtask into the main log 936 new_log_path = old_log_path + (".%d" % i) 937 if os.path.exists(new_log_path): 938 new_log = open(new_log_path) 939 old_log.write(new_log.read()) 940 new_log.close() 941 old_log.flush() 942 os.remove(new_log_path) 943 old_log.close() 944 945 self._logger.global_filename = old_log_filename 946 947 # handle any exceptions raised by the parallel tasks 948 if exceptions: 949 msg = "%d task(s) failed in job.parallel" % len(exceptions) 950 raise error.JobError(msg) 951 952 953 def quit(self): 954 # XXX: should have a better name. 955 self.harness.run_pause() 956 raise error.JobContinue("more to come") 957 958 959 def complete(self, status): 960 """Write pending TAP reports, clean up, and exit""" 961 # write out TAP reports 962 if self._tap.do_tap_report: 963 self._tap.write() 964 self._tap._write_tap_archive() 965 966 # write out a job HTML report 967 try: 968 html_report.create_report(self.resultdir) 969 except Exception, e: 970 logging.error("Error writing job HTML report: %s", e) 971 972 # We are about to exit 'complete' so clean up the control file. 973 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 974 shutil.move(self._state_file, dest) 975 976 self.harness.run_complete() 977 self.disable_external_logging() 978 sys.exit(status) 979 980 981 def _load_state(self): 982 # grab any initial state and set up $CONTROL.state as the backing file 983 init_state_file = self.control + '.init.state' 984 self._state_file = self.control + '.state' 985 if os.path.exists(init_state_file): 986 shutil.move(init_state_file, self._state_file) 987 self._state.set_backing_file(self._state_file) 988 989 # initialize the state engine, if necessary 990 has_steps = self._state.has('client', 'steps') 991 if not self._is_continuation and has_steps: 992 raise RuntimeError('Loaded state can only contain client.steps if ' 993 'this is a continuation') 994 995 if not has_steps: 996 logging.debug('Initializing the state engine') 997 self._state.set('client', 'steps', []) 998 999 1000 def handle_persistent_option(self, options, option_name): 1001 """ 1002 Select option from command line or persistent state. 1003 Store selected option to allow standalone client to continue 1004 after reboot with previously selected options. 1005 Priority: 1006 1. explicitly specified via command line 1007 2. stored in state file (if continuing job '-c') 1008 3. default == None 1009 """ 1010 option = None 1011 cmd_line_option = getattr(options, option_name) 1012 if cmd_line_option: 1013 option = cmd_line_option 1014 self._state.set('client', option_name, option) 1015 else: 1016 stored_option = self._state.get('client', option_name, None) 1017 if stored_option: 1018 option = stored_option 1019 logging.debug('Persistent option %s now set to %s', option_name, option) 1020 return option 1021 1022 1023 def __create_step_tuple(self, fn, args, dargs): 1024 # Legacy code passes in an array where the first arg is 1025 # the function or its name. 1026 if isinstance(fn, list): 1027 assert(len(args) == 0) 1028 assert(len(dargs) == 0) 1029 args = fn[1:] 1030 fn = fn[0] 1031 # Pickling actual functions is hairy, thus we have to call 1032 # them by name. Unfortunately, this means only functions 1033 # defined globally can be used as a next step. 1034 if callable(fn): 1035 fn = fn.__name__ 1036 if not isinstance(fn, types.StringTypes): 1037 raise StepError("Next steps must be functions or " 1038 "strings containing the function name") 1039 ancestry = copy.copy(self._current_step_ancestry) 1040 return (ancestry, fn, args, dargs) 1041 1042 1043 def next_step_append(self, fn, *args, **dargs): 1044 """Define the next step and place it at the end""" 1045 steps = self._state.get('client', 'steps') 1046 steps.append(self.__create_step_tuple(fn, args, dargs)) 1047 self._state.set('client', 'steps', steps) 1048 1049 1050 def next_step(self, fn, *args, **dargs): 1051 """Create a new step and place it after any steps added 1052 while running the current step but before any steps added in 1053 previous steps""" 1054 steps = self._state.get('client', 'steps') 1055 steps.insert(self._next_step_index, 1056 self.__create_step_tuple(fn, args, dargs)) 1057 self._next_step_index += 1 1058 self._state.set('client', 'steps', steps) 1059 1060 1061 def next_step_prepend(self, fn, *args, **dargs): 1062 """Insert a new step, executing first""" 1063 steps = self._state.get('client', 'steps') 1064 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 1065 self._next_step_index += 1 1066 self._state.set('client', 'steps', steps) 1067 1068 1069 1070 def _run_step_fn(self, local_vars, fn, args, dargs): 1071 """Run a (step) function within the given context""" 1072 1073 local_vars['__args'] = args 1074 local_vars['__dargs'] = dargs 1075 try: 1076 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 1077 return local_vars['__ret'] 1078 except SystemExit: 1079 raise # Send error.JobContinue and JobComplete on up to runjob. 1080 except error.TestNAError, detail: 1081 self.record(detail.exit_status, None, fn, str(detail)) 1082 except Exception, detail: 1083 raise error.UnhandledJobError(detail) 1084 1085 1086 def _create_frame(self, global_vars, ancestry, fn_name): 1087 """Set up the environment like it would have been when this 1088 function was first defined. 1089 1090 Child step engine 'implementations' must have 'return locals()' 1091 at end end of their steps. Because of this, we can call the 1092 parent function and get back all child functions (i.e. those 1093 defined within it). 1094 1095 Unfortunately, the call stack of the function calling 1096 job.next_step might have been deeper than the function it 1097 added. In order to make sure that the environment is what it 1098 should be, we need to then pop off the frames we built until 1099 we find the frame where the function was first defined.""" 1100 1101 # The copies ensure that the parent frames are not modified 1102 # while building child frames. This matters if we then 1103 # pop some frames in the next part of this function. 1104 current_frame = copy.copy(global_vars) 1105 frames = [current_frame] 1106 for steps_fn_name in ancestry: 1107 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1108 current_frame = copy.copy(ret) 1109 frames.append(current_frame) 1110 1111 # Walk up the stack frames until we find the place fn_name was defined. 1112 while len(frames) > 2: 1113 if fn_name not in frames[-2]: 1114 break 1115 if frames[-2][fn_name] != frames[-1][fn_name]: 1116 break 1117 frames.pop() 1118 ancestry.pop() 1119 1120 return (frames[-1], ancestry) 1121 1122 1123 def _add_step_init(self, local_vars, current_function): 1124 """If the function returned a dictionary that includes a 1125 function named 'step_init', prepend it to our list of steps. 1126 This will only get run the first time a function with a nested 1127 use of the step engine is run.""" 1128 1129 if (isinstance(local_vars, dict) and 1130 'step_init' in local_vars and 1131 callable(local_vars['step_init'])): 1132 # The init step is a child of the function 1133 # we were just running. 1134 self._current_step_ancestry.append(current_function) 1135 self.next_step_prepend('step_init') 1136 1137 1138 def step_engine(self): 1139 """The multi-run engine used when the control file defines step_init. 1140 1141 Does the next step. 1142 """ 1143 1144 # Set up the environment and then interpret the control file. 1145 # Some control files will have code outside of functions, 1146 # which means we need to have our state engine initialized 1147 # before reading in the file. 1148 global_control_vars = {'job': self, 1149 'args': self.args} 1150 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1151 try: 1152 execfile(self.control, global_control_vars, global_control_vars) 1153 except error.TestNAError, detail: 1154 self.record(detail.exit_status, None, self.control, str(detail)) 1155 except SystemExit: 1156 raise # Send error.JobContinue and JobComplete on up to runjob. 1157 except Exception, detail: 1158 # Syntax errors or other general Python exceptions coming out of 1159 # the top level of the control file itself go through here. 1160 raise error.UnhandledJobError(detail) 1161 1162 # If we loaded in a mid-job state file, then we presumably 1163 # know what steps we have yet to run. 1164 if not self._is_continuation: 1165 if 'step_init' in global_control_vars: 1166 self.next_step(global_control_vars['step_init']) 1167 else: 1168 # if last job failed due to unexpected reboot, record it as fail 1169 # so harness gets called 1170 last_job = self._state.get('client', 'unexpected_reboot', None) 1171 if last_job: 1172 subdir, testname = last_job 1173 self.record('FAIL', subdir, testname, 'unexpected reboot') 1174 self.record('END FAIL', subdir, testname) 1175 1176 # Iterate through the steps. If we reboot, we'll simply 1177 # continue iterating on the next step. 1178 while len(self._state.get('client', 'steps')) > 0: 1179 steps = self._state.get('client', 'steps') 1180 (ancestry, fn_name, args, dargs) = steps.pop(0) 1181 self._state.set('client', 'steps', steps) 1182 1183 self._next_step_index = 0 1184 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1185 local_vars, self._current_step_ancestry = ret 1186 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1187 self._add_step_init(local_vars, fn_name) 1188 1189 1190 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1191 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1192 on_every_test) 1193 1194 1195 def add_sysinfo_logfile(self, file, on_every_test=False): 1196 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1197 1198 1199 def _add_sysinfo_loggable(self, loggable, on_every_test): 1200 if on_every_test: 1201 self.sysinfo.test_loggables.add(loggable) 1202 else: 1203 self.sysinfo.boot_loggables.add(loggable) 1204 self._save_sysinfo_state() 1205 1206 1207 def _load_sysinfo_state(self): 1208 state = self._state.get('client', 'sysinfo', None) 1209 if state: 1210 self.sysinfo.deserialize(state) 1211 1212 1213 def _save_sysinfo_state(self): 1214 state = self.sysinfo.serialize() 1215 self._state.set('client', 'sysinfo', state) 1216 1217 1218class disk_usage_monitor: 1219 def __init__(self, logging_func, device, max_mb_per_hour): 1220 self.func = logging_func 1221 self.device = device 1222 self.max_mb_per_hour = max_mb_per_hour 1223 1224 1225 def start(self): 1226 self.initial_space = utils.freespace(self.device) 1227 self.start_time = time.time() 1228 1229 1230 def stop(self): 1231 # if no maximum usage rate was set, we don't need to 1232 # generate any warnings 1233 if not self.max_mb_per_hour: 1234 return 1235 1236 final_space = utils.freespace(self.device) 1237 used_space = self.initial_space - final_space 1238 stop_time = time.time() 1239 total_time = stop_time - self.start_time 1240 # round up the time to one minute, to keep extremely short 1241 # tests from generating false positives due to short, badly 1242 # timed bursts of activity 1243 total_time = max(total_time, 60.0) 1244 1245 # determine the usage rate 1246 bytes_per_sec = used_space / total_time 1247 mb_per_sec = bytes_per_sec / 1024**2 1248 mb_per_hour = mb_per_sec * 60 * 60 1249 1250 if mb_per_hour > self.max_mb_per_hour: 1251 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1252 msg %= (self.device, mb_per_hour) 1253 self.func(msg) 1254 1255 1256 @classmethod 1257 def watch(cls, *monitor_args, **monitor_dargs): 1258 """ Generic decorator to wrap a function call with the 1259 standard create-monitor -> start -> call -> stop idiom.""" 1260 def decorator(func): 1261 def watched_func(*args, **dargs): 1262 monitor = cls(*monitor_args, **monitor_dargs) 1263 monitor.start() 1264 try: 1265 func(*args, **dargs) 1266 finally: 1267 monitor.stop() 1268 return watched_func 1269 return decorator 1270 1271 1272def runjob(control, drop_caches, options): 1273 """ 1274 Run a job using the given control file. 1275 1276 This is the main interface to this module. 1277 1278 @see base_job.__init__ for parameter info. 1279 """ 1280 control = os.path.abspath(control) 1281 state = control + '.state' 1282 # Ensure state file is cleaned up before the job starts to run if autotest 1283 # is not running with the --continue flag 1284 if not options.cont and os.path.isfile(state): 1285 logging.debug('Cleaning up previously found state file') 1286 os.remove(state) 1287 1288 # instantiate the job object ready for the control file. 1289 myjob = None 1290 try: 1291 # Check that the control file is valid 1292 if not os.path.exists(control): 1293 raise error.JobError(control + ": control file not found") 1294 1295 # When continuing, the job is complete when there is no 1296 # state file, ensure we don't try and continue. 1297 if options.cont and not os.path.exists(state): 1298 raise error.JobComplete("all done") 1299 1300 myjob = job(control=control, drop_caches=drop_caches, options=options) 1301 1302 # Load in the users control file, may do any one of: 1303 # 1) execute in toto 1304 # 2) define steps, and select the first via next_step() 1305 myjob.step_engine() 1306 1307 except error.JobContinue: 1308 sys.exit(5) 1309 1310 except error.JobComplete: 1311 sys.exit(1) 1312 1313 except error.JobError, instance: 1314 logging.error("JOB ERROR: " + str(instance)) 1315 if myjob: 1316 command = None 1317 if len(instance.args) > 1: 1318 command = instance.args[1] 1319 myjob.record('ABORT', None, command, str(instance)) 1320 myjob.record('END ABORT', None, None, str(instance)) 1321 assert myjob._record_indent == 0 1322 myjob.complete(1) 1323 else: 1324 sys.exit(1) 1325 1326 except Exception, e: 1327 # NOTE: job._run_step_fn and job.step_engine will turn things into 1328 # a JobError for us. If we get here, its likely an autotest bug. 1329 msg = str(e) + '\n' + traceback.format_exc() 1330 logging.critical("JOB ERROR (autotest bug?): " + msg) 1331 if myjob: 1332 myjob.record('END ABORT', None, None, msg) 1333 assert myjob._record_indent == 0 1334 myjob.complete(1) 1335 else: 1336 sys.exit(1) 1337 1338 # If we get here, then we assume the job is complete and good. 1339 myjob.record('END GOOD', None, None) 1340 assert myjob._record_indent == 0 1341 1342 myjob.complete(0) 1343 1344 1345site_job = utils.import_site_class( 1346 __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job) 1347 1348class job(site_job): 1349 pass 1350