job.py revision ce1b0620fddfb6efa33cc9441d9c91ce06b30de3
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types, glob 9import logging, getpass, errno, weakref 10import cPickle as pickle 11from autotest_lib.client.bin import client_logging_config 12from autotest_lib.client.bin import utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, test, local_host 15from autotest_lib.client.bin import partition as partition_lib 16from autotest_lib.client.common_lib import base_job 17from autotest_lib.client.common_lib import error, barrier, log, logging_manager 18from autotest_lib.client.common_lib import base_packages, packages 19from autotest_lib.client.common_lib import global_config 20from autotest_lib.client.tools import html_report 21 22 23LAST_BOOT_TAG = object() 24JOB_PREAMBLE = """ 25from autotest_lib.client.common_lib.error import * 26from autotest_lib.client.bin.utils import * 27""" 28 29 30class StepError(error.AutotestError): 31 pass 32 33class NotAvailableError(error.AutotestError): 34 pass 35 36 37 38def _run_test_complete_on_exit(f): 39 """Decorator for job methods that automatically calls 40 self.harness.run_test_complete when the method exits, if appropriate.""" 41 def wrapped(self, *args, **dargs): 42 try: 43 return f(self, *args, **dargs) 44 finally: 45 if self._logger.global_filename == 'status': 46 self.harness.run_test_complete() 47 if self.drop_caches: 48 logging.debug("Dropping caches") 49 utils.drop_caches() 50 wrapped.__name__ = f.__name__ 51 wrapped.__doc__ = f.__doc__ 52 wrapped.__dict__.update(f.__dict__) 53 return wrapped 54 55 56class status_indenter(base_job.status_indenter): 57 """Provide a status indenter that is backed by job._record_prefix.""" 58 def __init__(self, job): 59 self.job = weakref.proxy(job) # avoid a circular reference 60 61 62 @property 63 def indent(self): 64 return self.job._record_indent 65 66 67 def increment(self): 68 self.job._record_indent += 1 69 70 71 def decrement(self): 72 self.job._record_indent -= 1 73 74 75class base_client_job(base_job.base_job): 76 """The client-side concrete implementation of base_job. 77 78 Optional properties provided by this implementation: 79 control 80 bootloader 81 harness 82 """ 83 84 _WARNING_DISABLE_DELAY = 5 85 86 # _record_indent is a persistent property, but only on the client 87 _job_state = base_job.base_job._job_state 88 _record_indent = _job_state.property_factory( 89 '_state', '_record_indent', 0, namespace='client') 90 _max_disk_usage_rate = _job_state.property_factory( 91 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 92 93 94 def __init__(self, control, options, drop_caches=True, 95 extra_copy_cmdline=None): 96 """ 97 Prepare a client side job object. 98 99 @param control: The control file (pathname of). 100 @param options: an object which includes: 101 jobtag: The job tag string (eg "default"). 102 cont: If this is the continuation of this job. 103 harness_type: An alternative server harness. [None] 104 use_external_logging: If true, the enable_external_logging 105 method will be called during construction. [False] 106 @param drop_caches: If true, utils.drop_caches() is called before and 107 between all tests. [True] 108 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 109 copy from the running kernel to all the installed kernels with 110 this job 111 """ 112 super(base_client_job, self).__init__(options=options) 113 self._pre_record_init(control, options) 114 try: 115 self._post_record_init(control, options, drop_caches, 116 extra_copy_cmdline) 117 except Exception, err: 118 self.record( 119 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 120 str(err)) 121 raise 122 123 124 @classmethod 125 def _get_environ_autodir(cls): 126 return os.environ['AUTODIR'] 127 128 129 @classmethod 130 def _find_base_directories(cls): 131 """ 132 Determine locations of autodir and clientdir (which are the same) 133 using os.environ. Serverdir does not exist in this context. 134 """ 135 autodir = clientdir = cls._get_environ_autodir() 136 return autodir, clientdir, None 137 138 139 @classmethod 140 def _parse_args(cls, args): 141 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 142 143 144 def _find_resultdir(self, options): 145 """ 146 Determine the directory for storing results. On a client this is 147 always <autodir>/results/<tag>, where tag is passed in on the command 148 line as an option. 149 """ 150 return os.path.join(self.autodir, 'results', options.tag) 151 152 153 def _get_status_logger(self): 154 """Return a reference to the status logger.""" 155 return self._logger 156 157 158 def _pre_record_init(self, control, options): 159 """ 160 Initialization function that should peform ONLY the required 161 setup so that the self.record() method works. 162 163 As of now self.record() needs self.resultdir, self._group_level, 164 self.harness and of course self._logger. 165 """ 166 if not options.cont: 167 self._cleanup_debugdir_files() 168 self._cleanup_results_dir() 169 170 logging_manager.configure_logging( 171 client_logging_config.ClientLoggingConfig(), 172 results_dir=self.resultdir, 173 verbose=options.verbose) 174 logging.info('Writing results to %s', self.resultdir) 175 176 # init_group_level needs the state 177 self.control = os.path.realpath(control) 178 self._is_continuation = options.cont 179 self._current_step_ancestry = [] 180 self._next_step_index = 0 181 self._load_state() 182 183 _harness = self.handle_persistent_option(options, 'harness') 184 _harness_args = self.handle_persistent_option(options, 'harness_args') 185 186 self.harness = harness.select(_harness, self, _harness_args) 187 188 # set up the status logger 189 def client_job_record_hook(entry): 190 msg_tag = '' 191 if '.' in self._logger.global_filename: 192 msg_tag = self._logger.global_filename.split('.', 1)[1] 193 # send the entry to the job harness 194 message = '\n'.join([entry.message] + entry.extra_message_lines) 195 rendered_entry = self._logger.render_entry(entry) 196 self.harness.test_status_detail(entry.status_code, entry.subdir, 197 entry.operation, message, msg_tag, 198 entry.fields) 199 self.harness.test_status(rendered_entry, msg_tag) 200 # send the entry to stdout, if it's enabled 201 logging.info(rendered_entry) 202 self._logger = base_job.status_logger( 203 self, status_indenter(self), record_hook=client_job_record_hook, 204 tap_writer=self._tap) 205 206 def _post_record_init(self, control, options, drop_caches, 207 extra_copy_cmdline): 208 """ 209 Perform job initialization not required by self.record(). 210 """ 211 self._init_drop_caches(drop_caches) 212 213 self._init_packages() 214 215 self.sysinfo = sysinfo.sysinfo(self.resultdir) 216 self._load_sysinfo_state() 217 218 if not options.cont: 219 download = os.path.join(self.testdir, 'download') 220 if not os.path.exists(download): 221 os.mkdir(download) 222 223 shutil.copyfile(self.control, 224 os.path.join(self.resultdir, 'control')) 225 226 self.control = control 227 228 self.logging = logging_manager.get_logging_manager( 229 manage_stdout_and_stderr=True, redirect_fds=True) 230 self.logging.start_logging() 231 232 self._config = config.config(self) 233 self.profilers = profilers.profilers(self) 234 235 self._init_bootloader() 236 237 self.machines = [options.hostname] 238 self.hosts = set([local_host.LocalHost(hostname=options.hostname, 239 bootloader=self.bootloader)]) 240 241 self.args = [] 242 if options.args: 243 self.args = self._parse_args(options.args) 244 245 if options.user: 246 self.user = options.user 247 else: 248 self.user = getpass.getuser() 249 250 self.sysinfo.log_per_reboot_data() 251 252 if not options.cont: 253 self.record('START', None, None) 254 255 self.harness.run_start() 256 257 if options.log: 258 self.enable_external_logging() 259 260 self._init_cmdline(extra_copy_cmdline) 261 262 self.num_tests_run = None 263 self.num_tests_failed = None 264 265 self.warning_loggers = None 266 self.warning_manager = None 267 268 269 def _init_drop_caches(self, drop_caches): 270 """ 271 Perform the drop caches initialization. 272 """ 273 self.drop_caches_between_iterations = ( 274 global_config.global_config.get_config_value('CLIENT', 275 'drop_caches_between_iterations', 276 type=bool, default=True)) 277 self.drop_caches = drop_caches 278 if self.drop_caches: 279 logging.debug("Dropping caches") 280 utils.drop_caches() 281 282 283 def _init_bootloader(self): 284 """ 285 Perform boottool initialization. 286 """ 287 tool = self.config_get('boottool.executable') 288 self.bootloader = boottool.boottool(tool) 289 290 291 def _init_packages(self): 292 """ 293 Perform the packages support initialization. 294 """ 295 self.pkgmgr = packages.PackageManager( 296 self.autodir, run_function_dargs={'timeout':3600}) 297 298 299 def _init_cmdline(self, extra_copy_cmdline): 300 """ 301 Initialize default cmdline for booted kernels in this job. 302 """ 303 copy_cmdline = set(['console']) 304 if extra_copy_cmdline is not None: 305 copy_cmdline.update(extra_copy_cmdline) 306 307 # extract console= and other args from cmdline and add them into the 308 # base args that we use for all kernels we install 309 cmdline = utils.read_one_line('/proc/cmdline') 310 kernel_args = [] 311 for karg in cmdline.split(): 312 for param in copy_cmdline: 313 if karg.startswith(param) and \ 314 (len(param) == len(karg) or karg[len(param)] == '='): 315 kernel_args.append(karg) 316 self.config_set('boot.default_args', ' '.join(kernel_args)) 317 318 319 def _cleanup_results_dir(self): 320 """Delete everything in resultsdir""" 321 assert os.path.exists(self.resultdir) 322 list_files = glob.glob('%s/*' % self.resultdir) 323 for f in list_files: 324 if os.path.isdir(f): 325 shutil.rmtree(f) 326 elif os.path.isfile(f): 327 os.remove(f) 328 329 330 def _cleanup_debugdir_files(self): 331 """ 332 Delete any leftover debugdir files 333 """ 334 list_files = glob.glob("/tmp/autotest_results_dir.*") 335 for f in list_files: 336 os.remove(f) 337 338 339 def disable_warnings(self, warning_type): 340 self.record("INFO", None, None, 341 "disabling %s warnings" % warning_type, 342 {"warnings.disable": warning_type}) 343 time.sleep(self._WARNING_DISABLE_DELAY) 344 345 346 def enable_warnings(self, warning_type): 347 time.sleep(self._WARNING_DISABLE_DELAY) 348 self.record("INFO", None, None, 349 "enabling %s warnings" % warning_type, 350 {"warnings.enable": warning_type}) 351 352 353 def monitor_disk_usage(self, max_rate): 354 """\ 355 Signal that the job should monitor disk space usage on / 356 and generate a warning if a test uses up disk space at a 357 rate exceeding 'max_rate'. 358 359 Parameters: 360 max_rate - the maximium allowed rate of disk consumption 361 during a test, in MB/hour, or 0 to indicate 362 no limit. 363 """ 364 self._max_disk_usage_rate = max_rate 365 366 367 def relative_path(self, path): 368 """\ 369 Return a patch relative to the job results directory 370 """ 371 head = len(self.resultdir) + 1 # remove the / inbetween 372 return path[head:] 373 374 375 def control_get(self): 376 return self.control 377 378 379 def control_set(self, control): 380 self.control = os.path.abspath(control) 381 382 383 def harness_select(self, which, harness_args): 384 self.harness = harness.select(which, self, harness_args) 385 386 387 def config_set(self, name, value): 388 self._config.set(name, value) 389 390 391 def config_get(self, name): 392 return self._config.get(name) 393 394 395 def setup_dirs(self, results_dir, tmp_dir): 396 if not tmp_dir: 397 tmp_dir = os.path.join(self.tmpdir, 'build') 398 if not os.path.exists(tmp_dir): 399 os.mkdir(tmp_dir) 400 if not os.path.isdir(tmp_dir): 401 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 402 raise ValueError(e_msg) 403 404 # We label the first build "build" and then subsequent ones 405 # as "build.2", "build.3", etc. Whilst this is a little bit 406 # inconsistent, 99.9% of jobs will only have one build 407 # (that's not done as kernbench, sparse, or buildtest), 408 # so it works out much cleaner. One of life's comprimises. 409 if not results_dir: 410 results_dir = os.path.join(self.resultdir, 'build') 411 i = 2 412 while os.path.exists(results_dir): 413 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 414 i += 1 415 if not os.path.exists(results_dir): 416 os.mkdir(results_dir) 417 418 return (results_dir, tmp_dir) 419 420 421 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 422 kjob = None ): 423 """Summon a xen object""" 424 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 425 build_dir = 'xen' 426 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 427 leave, kjob) 428 429 430 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 431 """Summon a kernel object""" 432 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 433 build_dir = 'linux' 434 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 435 build_dir, leave) 436 437 438 def barrier(self, *args, **kwds): 439 """Create a barrier object""" 440 return barrier.barrier(*args, **kwds) 441 442 443 def install_pkg(self, name, pkg_type, install_dir): 444 ''' 445 This method is a simple wrapper around the actual package 446 installation method in the Packager class. This is used 447 internally by the profilers, deps and tests code. 448 name : name of the package (ex: sleeptest, dbench etc.) 449 pkg_type : Type of the package (ex: test, dep etc.) 450 install_dir : The directory in which the source is actually 451 untarred into. (ex: client/profilers/<name> for profilers) 452 ''' 453 if self.pkgmgr.repositories: 454 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 455 456 457 def add_repository(self, repo_urls): 458 ''' 459 Adds the repository locations to the job so that packages 460 can be fetched from them when needed. The repository list 461 needs to be a string list 462 Ex: job.add_repository(['http://blah1','http://blah2']) 463 ''' 464 for repo_url in repo_urls: 465 self.pkgmgr.add_repository(repo_url) 466 467 # Fetch the packages' checksum file that contains the checksums 468 # of all the packages if it is not already fetched. The checksum 469 # is always fetched whenever a job is first started. This 470 # is not done in the job's constructor as we don't have the list of 471 # the repositories there (and obviously don't care about this file 472 # if we are not using the repos) 473 try: 474 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 475 base_packages.CHECKSUM_FILE) 476 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 477 checksum_file_path, use_checksum=False) 478 except error.PackageFetchError: 479 # packaging system might not be working in this case 480 # Silently fall back to the normal case 481 pass 482 483 484 def require_gcc(self): 485 """ 486 Test whether gcc is installed on the machine. 487 """ 488 # check if gcc is installed on the system. 489 try: 490 utils.system('which gcc') 491 except error.CmdError, e: 492 raise NotAvailableError('gcc is required by this job and is ' 493 'not available on the system') 494 495 496 def setup_dep(self, deps): 497 """Set up the dependencies for this test. 498 deps is a list of libraries required for this test. 499 """ 500 # Fetch the deps from the repositories and set them up. 501 for dep in deps: 502 dep_dir = os.path.join(self.autodir, 'deps', dep) 503 # Search for the dependency in the repositories if specified, 504 # else check locally. 505 try: 506 self.install_pkg(dep, 'dep', dep_dir) 507 except error.PackageInstallError: 508 # see if the dep is there locally 509 pass 510 511 # dep_dir might not exist if it is not fetched from the repos 512 if not os.path.exists(dep_dir): 513 raise error.TestError("Dependency %s does not exist" % dep) 514 515 os.chdir(dep_dir) 516 if execfile('%s.py' % dep, {}) is None: 517 logging.info('Dependency %s successfuly built', dep) 518 519 520 def _runtest(self, url, tag, timeout, args, dargs): 521 try: 522 l = lambda : test.runtest(self, url, tag, args, dargs) 523 pid = parallel.fork_start(self.resultdir, l) 524 525 if timeout: 526 logging.debug('Waiting for pid %d for %d seconds', pid, timeout) 527 parallel.fork_waitfor_timed(self.resultdir, pid, timeout) 528 else: 529 parallel.fork_waitfor(self.resultdir, pid) 530 531 except error.TestBaseException: 532 # These are already classified with an error type (exit_status) 533 raise 534 except error.JobError: 535 raise # Caught further up and turned into an ABORT. 536 except Exception, e: 537 # Converts all other exceptions thrown by the test regardless 538 # of phase into a TestError(TestBaseException) subclass that 539 # reports them with their full stack trace. 540 raise error.UnhandledTestError(e) 541 542 543 def _run_test_base(self, url, *args, **dargs): 544 """ 545 Prepares arguments and run functions to run_test and run_test_detail. 546 547 @param url A url that identifies the test to run. 548 @param tag An optional keyword argument that will be added to the 549 test and subdir name. 550 @param subdir_tag An optional keyword argument that will be added 551 to the subdir name. 552 553 @returns: 554 subdir: Test subdirectory 555 testname: Test name 556 group_func: Actual test run function 557 timeout: Test timeout 558 """ 559 group, testname = self.pkgmgr.get_package_name(url, 'test') 560 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 561 outputdir = self._make_test_outputdir(subdir) 562 563 timeout = dargs.pop('timeout', None) 564 if timeout: 565 logging.debug('Test has timeout: %d sec.', timeout) 566 567 def log_warning(reason): 568 self.record("WARN", subdir, testname, reason) 569 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 570 def group_func(): 571 try: 572 self._runtest(url, tag, timeout, args, dargs) 573 except error.TestBaseException, detail: 574 # The error is already classified, record it properly. 575 self.record(detail.exit_status, subdir, testname, str(detail)) 576 raise 577 else: 578 self.record('GOOD', subdir, testname, 'completed successfully') 579 580 return (subdir, testname, group_func, timeout) 581 582 583 @_run_test_complete_on_exit 584 def run_test(self, url, *args, **dargs): 585 """ 586 Summon a test object and run it. 587 588 @param url A url that identifies the test to run. 589 @param tag An optional keyword argument that will be added to the 590 test and subdir name. 591 @param subdir_tag An optional keyword argument that will be added 592 to the subdir name. 593 594 @returns True if the test passes, False otherwise. 595 """ 596 (subdir, testname, group_func, timeout) = self._run_test_base(url, 597 *args, 598 **dargs) 599 try: 600 self._rungroup(subdir, testname, group_func, timeout) 601 return True 602 except error.TestBaseException: 603 return False 604 # Any other exception here will be given to the caller 605 # 606 # NOTE: The only exception possible from the control file here 607 # is error.JobError as _runtest() turns all others into an 608 # UnhandledTestError that is caught above. 609 610 611 @_run_test_complete_on_exit 612 def run_test_detail(self, url, *args, **dargs): 613 """ 614 Summon a test object and run it, returning test status. 615 616 @param url A url that identifies the test to run. 617 @param tag An optional keyword argument that will be added to the 618 test and subdir name. 619 @param subdir_tag An optional keyword argument that will be added 620 to the subdir name. 621 622 @returns Test status 623 @see: client/common_lib/error.py, exit_status 624 """ 625 (subdir, testname, group_func, timeout) = self._run_test_base(url, 626 *args, 627 **dargs) 628 try: 629 self._rungroup(subdir, testname, group_func, timeout) 630 return 'GOOD' 631 except error.TestBaseException, detail: 632 return detail.exit_status 633 634 635 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs): 636 """\ 637 subdir: 638 name of the group 639 testname: 640 name of the test to run, or support step 641 function: 642 subroutine to run 643 *args: 644 arguments for the function 645 646 Returns the result of the passed in function 647 """ 648 649 try: 650 optional_fields = None 651 if timeout: 652 optional_fields = {} 653 optional_fields['timeout'] = timeout 654 self.record('START', subdir, testname, 655 optional_fields=optional_fields) 656 657 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 658 try: 659 result = function(*args, **dargs) 660 self.record('END GOOD', subdir, testname) 661 return result 662 except error.TestBaseException, e: 663 self.record('END %s' % e.exit_status, subdir, testname) 664 raise 665 except error.JobError, e: 666 self.record('END ABORT', subdir, testname) 667 raise 668 except Exception, e: 669 # This should only ever happen due to a bug in the given 670 # function's code. The common case of being called by 671 # run_test() will never reach this. If a control file called 672 # run_group() itself, bugs in its function will be caught 673 # here. 674 err_msg = str(e) + '\n' + traceback.format_exc() 675 self.record('END ERROR', subdir, testname, err_msg) 676 raise 677 finally: 678 self._state.discard('client', 'unexpected_reboot') 679 680 681 def run_group(self, function, tag=None, **dargs): 682 """ 683 Run a function nested within a group level. 684 685 function: 686 Callable to run. 687 tag: 688 An optional tag name for the group. If None (default) 689 function.__name__ will be used. 690 **dargs: 691 Named arguments for the function. 692 """ 693 if tag: 694 name = tag 695 else: 696 name = function.__name__ 697 698 try: 699 return self._rungroup(subdir=None, testname=name, 700 function=function, timeout=None, **dargs) 701 except (SystemExit, error.TestBaseException): 702 raise 703 # If there was a different exception, turn it into a TestError. 704 # It will be caught by step_engine or _run_step_fn. 705 except Exception, e: 706 raise error.UnhandledTestError(e) 707 708 709 def cpu_count(self): 710 return utils.count_cpus() # use total system count 711 712 713 def start_reboot(self): 714 self.record('START', None, 'reboot') 715 self.record('GOOD', None, 'reboot.start') 716 717 718 def _record_reboot_failure(self, subdir, operation, status, 719 running_id=None): 720 self.record("ABORT", subdir, operation, status) 721 if not running_id: 722 running_id = utils.running_os_ident() 723 kernel = {"kernel": running_id.split("::")[0]} 724 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 725 726 727 def _check_post_reboot(self, subdir, running_id=None): 728 """ 729 Function to perform post boot checks such as if the system configuration 730 has changed across reboots (specifically, CPUs and partitions). 731 732 @param subdir: The subdir to use in the job.record call. 733 @param running_id: An optional running_id to include in the reboot 734 failure log message 735 736 @raise JobError: Raised if the current configuration does not match the 737 pre-reboot configuration. 738 """ 739 # check to see if any partitions have changed 740 partition_list = partition_lib.get_partition_list(self, 741 exclude_swap=False) 742 mount_info = partition_lib.get_mount_info(partition_list) 743 old_mount_info = self._state.get('client', 'mount_info') 744 if mount_info != old_mount_info: 745 new_entries = mount_info - old_mount_info 746 old_entries = old_mount_info - mount_info 747 description = ("mounted partitions are different after reboot " 748 "(old entries: %s, new entries: %s)" % 749 (old_entries, new_entries)) 750 self._record_reboot_failure(subdir, "reboot.verify_config", 751 description, running_id=running_id) 752 raise error.JobError("Reboot failed: %s" % description) 753 754 # check to see if any CPUs have changed 755 cpu_count = utils.count_cpus() 756 old_count = self._state.get('client', 'cpu_count') 757 if cpu_count != old_count: 758 description = ('Number of CPUs changed after reboot ' 759 '(old count: %d, new count: %d)' % 760 (old_count, cpu_count)) 761 self._record_reboot_failure(subdir, 'reboot.verify_config', 762 description, running_id=running_id) 763 raise error.JobError('Reboot failed: %s' % description) 764 765 766 def end_reboot(self, subdir, kernel, patches, running_id=None): 767 self._check_post_reboot(subdir, running_id=running_id) 768 769 # strip ::<timestamp> from the kernel version if present 770 kernel = kernel.split("::")[0] 771 kernel_info = {"kernel": kernel} 772 for i, patch in enumerate(patches): 773 kernel_info["patch%d" % i] = patch 774 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 775 776 777 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 778 type='src', patches=[]): 779 """ Check the passed kernel identifier against the command line 780 and the running kernel, abort the job on missmatch. """ 781 782 logging.info("POST BOOT: checking booted kernel " 783 "mark=%d identity='%s' type='%s'", 784 expected_when, expected_id, type) 785 786 running_id = utils.running_os_ident() 787 788 cmdline = utils.read_one_line("/proc/cmdline") 789 790 find_sum = re.compile(r'.*IDENT=(\d+)') 791 m = find_sum.match(cmdline) 792 cmdline_when = -1 793 if m: 794 cmdline_when = int(m.groups()[0]) 795 796 # We have all the facts, see if they indicate we 797 # booted the requested kernel or not. 798 bad = False 799 if (type == 'src' and expected_id != running_id or 800 type == 'rpm' and 801 not running_id.startswith(expected_id + '::')): 802 logging.error("Kernel identifier mismatch") 803 bad = True 804 if expected_when != cmdline_when: 805 logging.error("Kernel command line mismatch") 806 bad = True 807 808 if bad: 809 logging.error(" Expected Ident: " + expected_id) 810 logging.error(" Running Ident: " + running_id) 811 logging.error(" Expected Mark: %d", expected_when) 812 logging.error("Command Line Mark: %d", cmdline_when) 813 logging.error(" Command Line: " + cmdline) 814 815 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 816 running_id=running_id) 817 raise error.JobError("Reboot returned with the wrong kernel") 818 819 self.record('GOOD', subdir, 'reboot.verify', 820 utils.running_os_full_version()) 821 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 822 823 824 def partition(self, device, loop_size=0, mountpoint=None): 825 """ 826 Work with a machine partition 827 828 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 829 @param mountpoint: Specify a directory to mount to. If not specified 830 autotest tmp directory will be used. 831 @param loop_size: Size of loopback device (in MB). Defaults to 0. 832 833 @return: A L{client.bin.partition.partition} object 834 """ 835 836 if not mountpoint: 837 mountpoint = self.tmpdir 838 return partition_lib.partition(self, device, loop_size, mountpoint) 839 840 @utils.deprecated 841 def filesystem(self, device, mountpoint=None, loop_size=0): 842 """ Same as partition 843 844 @deprecated: Use partition method instead 845 """ 846 return self.partition(device, loop_size, mountpoint) 847 848 849 def enable_external_logging(self): 850 pass 851 852 853 def disable_external_logging(self): 854 pass 855 856 857 def reboot_setup(self): 858 # save the partition list and mount points, as well as the cpu count 859 partition_list = partition_lib.get_partition_list(self, 860 exclude_swap=False) 861 mount_info = partition_lib.get_mount_info(partition_list) 862 self._state.set('client', 'mount_info', mount_info) 863 self._state.set('client', 'cpu_count', utils.count_cpus()) 864 865 866 def reboot(self, tag=LAST_BOOT_TAG): 867 if tag == LAST_BOOT_TAG: 868 tag = self.last_boot_tag 869 else: 870 self.last_boot_tag = tag 871 872 self.reboot_setup() 873 self.harness.run_reboot() 874 default = self.config_get('boot.set_default') 875 if default: 876 self.bootloader.set_default(tag) 877 else: 878 self.bootloader.boot_once(tag) 879 880 # HACK: using this as a module sometimes hangs shutdown, so if it's 881 # installed unload it first 882 utils.system("modprobe -r netconsole", ignore_status=True) 883 884 # sync first, so that a sync during shutdown doesn't time out 885 utils.system("sync; sync", ignore_status=True) 886 887 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 888 self.quit() 889 890 891 def noop(self, text): 892 logging.info("job: noop: " + text) 893 894 895 @_run_test_complete_on_exit 896 def parallel(self, *tasklist): 897 """Run tasks in parallel""" 898 899 pids = [] 900 old_log_filename = self._logger.global_filename 901 for i, task in enumerate(tasklist): 902 assert isinstance(task, (tuple, list)) 903 self._logger.global_filename = old_log_filename + (".%d" % i) 904 def task_func(): 905 # stub out _record_indent with a process-local one 906 base_record_indent = self._record_indent 907 proc_local = self._job_state.property_factory( 908 '_state', '_record_indent.%d' % os.getpid(), 909 base_record_indent, namespace='client') 910 self.__class__._record_indent = proc_local 911 task[0](*task[1:]) 912 pids.append(parallel.fork_start(self.resultdir, task_func)) 913 914 old_log_path = os.path.join(self.resultdir, old_log_filename) 915 old_log = open(old_log_path, "a") 916 exceptions = [] 917 for i, pid in enumerate(pids): 918 # wait for the task to finish 919 try: 920 parallel.fork_waitfor(self.resultdir, pid) 921 except Exception, e: 922 exceptions.append(e) 923 # copy the logs from the subtask into the main log 924 new_log_path = old_log_path + (".%d" % i) 925 if os.path.exists(new_log_path): 926 new_log = open(new_log_path) 927 old_log.write(new_log.read()) 928 new_log.close() 929 old_log.flush() 930 os.remove(new_log_path) 931 old_log.close() 932 933 self._logger.global_filename = old_log_filename 934 935 # handle any exceptions raised by the parallel tasks 936 if exceptions: 937 msg = "%d task(s) failed in job.parallel" % len(exceptions) 938 raise error.JobError(msg) 939 940 941 def quit(self): 942 # XXX: should have a better name. 943 self.harness.run_pause() 944 raise error.JobContinue("more to come") 945 946 947 def complete(self, status): 948 """Write pending TAP reports, clean up, and exit""" 949 # write out TAP reports 950 if self._tap.do_tap_report: 951 self._tap.write() 952 self._tap._write_tap_archive() 953 954 # write out a job HTML report 955 try: 956 html_report.create_report(self.resultdir) 957 except Exception, e: 958 logging.error("Error writing job HTML report: %s", e) 959 960 # We are about to exit 'complete' so clean up the control file. 961 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 962 shutil.move(self._state_file, dest) 963 964 self.harness.run_complete() 965 self.disable_external_logging() 966 sys.exit(status) 967 968 969 def _load_state(self): 970 # grab any initial state and set up $CONTROL.state as the backing file 971 init_state_file = self.control + '.init.state' 972 self._state_file = self.control + '.state' 973 if os.path.exists(init_state_file): 974 shutil.move(init_state_file, self._state_file) 975 self._state.set_backing_file(self._state_file) 976 977 # initialize the state engine, if necessary 978 has_steps = self._state.has('client', 'steps') 979 if not self._is_continuation and has_steps: 980 raise RuntimeError('Loaded state can only contain client.steps if ' 981 'this is a continuation') 982 983 if not has_steps: 984 logging.info('Initializing the state engine') 985 self._state.set('client', 'steps', []) 986 987 988 def handle_persistent_option(self, options, option_name): 989 """ 990 Select option from command line or persistent state. 991 Store selected option to allow standalone client to continue 992 after reboot with previously selected options. 993 Priority: 994 1. explicitly specified via command line 995 2. stored in state file (if continuing job '-c') 996 3. default == None 997 """ 998 option = None 999 cmd_line_option = getattr(options, option_name) 1000 if cmd_line_option: 1001 option = cmd_line_option 1002 self._state.set('client', option_name, option) 1003 else: 1004 stored_option = self._state.get('client', option_name, None) 1005 if stored_option: 1006 option = stored_option 1007 logging.debug('Persistent option %s now set to %s', option_name, option) 1008 return option 1009 1010 1011 def __create_step_tuple(self, fn, args, dargs): 1012 # Legacy code passes in an array where the first arg is 1013 # the function or its name. 1014 if isinstance(fn, list): 1015 assert(len(args) == 0) 1016 assert(len(dargs) == 0) 1017 args = fn[1:] 1018 fn = fn[0] 1019 # Pickling actual functions is hairy, thus we have to call 1020 # them by name. Unfortunately, this means only functions 1021 # defined globally can be used as a next step. 1022 if callable(fn): 1023 fn = fn.__name__ 1024 if not isinstance(fn, types.StringTypes): 1025 raise StepError("Next steps must be functions or " 1026 "strings containing the function name") 1027 ancestry = copy.copy(self._current_step_ancestry) 1028 return (ancestry, fn, args, dargs) 1029 1030 1031 def next_step_append(self, fn, *args, **dargs): 1032 """Define the next step and place it at the end""" 1033 steps = self._state.get('client', 'steps') 1034 steps.append(self.__create_step_tuple(fn, args, dargs)) 1035 self._state.set('client', 'steps', steps) 1036 1037 1038 def next_step(self, fn, *args, **dargs): 1039 """Create a new step and place it after any steps added 1040 while running the current step but before any steps added in 1041 previous steps""" 1042 steps = self._state.get('client', 'steps') 1043 steps.insert(self._next_step_index, 1044 self.__create_step_tuple(fn, args, dargs)) 1045 self._next_step_index += 1 1046 self._state.set('client', 'steps', steps) 1047 1048 1049 def next_step_prepend(self, fn, *args, **dargs): 1050 """Insert a new step, executing first""" 1051 steps = self._state.get('client', 'steps') 1052 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 1053 self._next_step_index += 1 1054 self._state.set('client', 'steps', steps) 1055 1056 1057 1058 def _run_step_fn(self, local_vars, fn, args, dargs): 1059 """Run a (step) function within the given context""" 1060 1061 local_vars['__args'] = args 1062 local_vars['__dargs'] = dargs 1063 try: 1064 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 1065 return local_vars['__ret'] 1066 except SystemExit: 1067 raise # Send error.JobContinue and JobComplete on up to runjob. 1068 except error.TestNAError, detail: 1069 self.record(detail.exit_status, None, fn, str(detail)) 1070 except Exception, detail: 1071 raise error.UnhandledJobError(detail) 1072 1073 1074 def _create_frame(self, global_vars, ancestry, fn_name): 1075 """Set up the environment like it would have been when this 1076 function was first defined. 1077 1078 Child step engine 'implementations' must have 'return locals()' 1079 at end end of their steps. Because of this, we can call the 1080 parent function and get back all child functions (i.e. those 1081 defined within it). 1082 1083 Unfortunately, the call stack of the function calling 1084 job.next_step might have been deeper than the function it 1085 added. In order to make sure that the environment is what it 1086 should be, we need to then pop off the frames we built until 1087 we find the frame where the function was first defined.""" 1088 1089 # The copies ensure that the parent frames are not modified 1090 # while building child frames. This matters if we then 1091 # pop some frames in the next part of this function. 1092 current_frame = copy.copy(global_vars) 1093 frames = [current_frame] 1094 for steps_fn_name in ancestry: 1095 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1096 current_frame = copy.copy(ret) 1097 frames.append(current_frame) 1098 1099 # Walk up the stack frames until we find the place fn_name was defined. 1100 while len(frames) > 2: 1101 if fn_name not in frames[-2]: 1102 break 1103 if frames[-2][fn_name] != frames[-1][fn_name]: 1104 break 1105 frames.pop() 1106 ancestry.pop() 1107 1108 return (frames[-1], ancestry) 1109 1110 1111 def _add_step_init(self, local_vars, current_function): 1112 """If the function returned a dictionary that includes a 1113 function named 'step_init', prepend it to our list of steps. 1114 This will only get run the first time a function with a nested 1115 use of the step engine is run.""" 1116 1117 if (isinstance(local_vars, dict) and 1118 'step_init' in local_vars and 1119 callable(local_vars['step_init'])): 1120 # The init step is a child of the function 1121 # we were just running. 1122 self._current_step_ancestry.append(current_function) 1123 self.next_step_prepend('step_init') 1124 1125 1126 def step_engine(self): 1127 """The multi-run engine used when the control file defines step_init. 1128 1129 Does the next step. 1130 """ 1131 1132 # Set up the environment and then interpret the control file. 1133 # Some control files will have code outside of functions, 1134 # which means we need to have our state engine initialized 1135 # before reading in the file. 1136 global_control_vars = {'job': self, 1137 'args': self.args} 1138 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1139 try: 1140 execfile(self.control, global_control_vars, global_control_vars) 1141 except error.TestNAError, detail: 1142 self.record(detail.exit_status, None, self.control, str(detail)) 1143 except SystemExit: 1144 raise # Send error.JobContinue and JobComplete on up to runjob. 1145 except Exception, detail: 1146 # Syntax errors or other general Python exceptions coming out of 1147 # the top level of the control file itself go through here. 1148 raise error.UnhandledJobError(detail) 1149 1150 # If we loaded in a mid-job state file, then we presumably 1151 # know what steps we have yet to run. 1152 if not self._is_continuation: 1153 if 'step_init' in global_control_vars: 1154 self.next_step(global_control_vars['step_init']) 1155 else: 1156 # if last job failed due to unexpected reboot, record it as fail 1157 # so harness gets called 1158 last_job = self._state.get('client', 'unexpected_reboot', None) 1159 if last_job: 1160 subdir, testname = last_job 1161 self.record('FAIL', subdir, testname, 'unexpected reboot') 1162 self.record('END FAIL', subdir, testname) 1163 1164 # Iterate through the steps. If we reboot, we'll simply 1165 # continue iterating on the next step. 1166 while len(self._state.get('client', 'steps')) > 0: 1167 steps = self._state.get('client', 'steps') 1168 (ancestry, fn_name, args, dargs) = steps.pop(0) 1169 self._state.set('client', 'steps', steps) 1170 1171 self._next_step_index = 0 1172 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1173 local_vars, self._current_step_ancestry = ret 1174 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1175 self._add_step_init(local_vars, fn_name) 1176 1177 1178 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1179 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1180 on_every_test) 1181 1182 1183 def add_sysinfo_logfile(self, file, on_every_test=False): 1184 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1185 1186 1187 def _add_sysinfo_loggable(self, loggable, on_every_test): 1188 if on_every_test: 1189 self.sysinfo.test_loggables.add(loggable) 1190 else: 1191 self.sysinfo.boot_loggables.add(loggable) 1192 self._save_sysinfo_state() 1193 1194 1195 def _load_sysinfo_state(self): 1196 state = self._state.get('client', 'sysinfo', None) 1197 if state: 1198 self.sysinfo.deserialize(state) 1199 1200 1201 def _save_sysinfo_state(self): 1202 state = self.sysinfo.serialize() 1203 self._state.set('client', 'sysinfo', state) 1204 1205 1206class disk_usage_monitor: 1207 def __init__(self, logging_func, device, max_mb_per_hour): 1208 self.func = logging_func 1209 self.device = device 1210 self.max_mb_per_hour = max_mb_per_hour 1211 1212 1213 def start(self): 1214 self.initial_space = utils.freespace(self.device) 1215 self.start_time = time.time() 1216 1217 1218 def stop(self): 1219 # if no maximum usage rate was set, we don't need to 1220 # generate any warnings 1221 if not self.max_mb_per_hour: 1222 return 1223 1224 final_space = utils.freespace(self.device) 1225 used_space = self.initial_space - final_space 1226 stop_time = time.time() 1227 total_time = stop_time - self.start_time 1228 # round up the time to one minute, to keep extremely short 1229 # tests from generating false positives due to short, badly 1230 # timed bursts of activity 1231 total_time = max(total_time, 60.0) 1232 1233 # determine the usage rate 1234 bytes_per_sec = used_space / total_time 1235 mb_per_sec = bytes_per_sec / 1024**2 1236 mb_per_hour = mb_per_sec * 60 * 60 1237 1238 if mb_per_hour > self.max_mb_per_hour: 1239 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1240 msg %= (self.device, mb_per_hour) 1241 self.func(msg) 1242 1243 1244 @classmethod 1245 def watch(cls, *monitor_args, **monitor_dargs): 1246 """ Generic decorator to wrap a function call with the 1247 standard create-monitor -> start -> call -> stop idiom.""" 1248 def decorator(func): 1249 def watched_func(*args, **dargs): 1250 monitor = cls(*monitor_args, **monitor_dargs) 1251 monitor.start() 1252 try: 1253 func(*args, **dargs) 1254 finally: 1255 monitor.stop() 1256 return watched_func 1257 return decorator 1258 1259 1260def runjob(control, drop_caches, options): 1261 """ 1262 Run a job using the given control file. 1263 1264 This is the main interface to this module. 1265 1266 @see base_job.__init__ for parameter info. 1267 """ 1268 control = os.path.abspath(control) 1269 state = control + '.state' 1270 # Ensure state file is cleaned up before the job starts to run if autotest 1271 # is not running with the --continue flag 1272 if not options.cont and os.path.isfile(state): 1273 logging.debug('Cleaning up previously found state file') 1274 os.remove(state) 1275 1276 # instantiate the job object ready for the control file. 1277 myjob = None 1278 try: 1279 # Check that the control file is valid 1280 if not os.path.exists(control): 1281 raise error.JobError(control + ": control file not found") 1282 1283 # When continuing, the job is complete when there is no 1284 # state file, ensure we don't try and continue. 1285 if options.cont and not os.path.exists(state): 1286 raise error.JobComplete("all done") 1287 1288 myjob = job(control=control, drop_caches=drop_caches, options=options) 1289 1290 # Load in the users control file, may do any one of: 1291 # 1) execute in toto 1292 # 2) define steps, and select the first via next_step() 1293 myjob.step_engine() 1294 1295 except error.JobContinue: 1296 sys.exit(5) 1297 1298 except error.JobComplete: 1299 sys.exit(1) 1300 1301 except error.JobError, instance: 1302 logging.error("JOB ERROR: " + str(instance)) 1303 if myjob: 1304 command = None 1305 if len(instance.args) > 1: 1306 command = instance.args[1] 1307 myjob.record('ABORT', None, command, str(instance)) 1308 myjob.record('END ABORT', None, None, str(instance)) 1309 assert myjob._record_indent == 0 1310 myjob.complete(1) 1311 else: 1312 sys.exit(1) 1313 1314 except Exception, e: 1315 # NOTE: job._run_step_fn and job.step_engine will turn things into 1316 # a JobError for us. If we get here, its likely an autotest bug. 1317 msg = str(e) + '\n' + traceback.format_exc() 1318 logging.critical("JOB ERROR (autotest bug?): " + msg) 1319 if myjob: 1320 myjob.record('END ABORT', None, None, msg) 1321 assert myjob._record_indent == 0 1322 myjob.complete(1) 1323 else: 1324 sys.exit(1) 1325 1326 # If we get here, then we assume the job is complete and good. 1327 myjob.record('END GOOD', None, None) 1328 assert myjob._record_indent == 0 1329 1330 myjob.complete(0) 1331 1332 1333site_job = utils.import_site_class( 1334 __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job) 1335 1336class job(site_job): 1337 pass 1338