job.py revision 8904d6f41c99d44b01bcd60eab70f2ac5ef25337
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types, glob 9import logging 10import cPickle as pickle 11from autotest_lib.client.bin import client_logging_config 12from autotest_lib.client.bin import utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, test, local_host 15from autotest_lib.client.bin import partition as partition_lib 16from autotest_lib.client.common_lib import error, barrier, log, logging_manager 17from autotest_lib.client.common_lib import base_packages, packages 18 19LAST_BOOT_TAG = object() 20NO_DEFAULT = object() 21JOB_PREAMBLE = """ 22from autotest_lib.client.common_lib.error import * 23from autotest_lib.client.bin.utils import * 24""" 25 26 27class StepError(error.AutotestError): 28 pass 29 30class NotAvailableError(error.AutotestError): 31 pass 32 33 34 35def _run_test_complete_on_exit(f): 36 """Decorator for job methods that automatically calls 37 self.harness.run_test_complete when the method exits, if appropriate.""" 38 def wrapped(self, *args, **dargs): 39 try: 40 return f(self, *args, **dargs) 41 finally: 42 if self.log_filename == self.DEFAULT_LOG_FILENAME: 43 self.harness.run_test_complete() 44 if self.drop_caches: 45 logging.debug("Dropping caches") 46 utils.drop_caches() 47 wrapped.__name__ = f.__name__ 48 wrapped.__doc__ = f.__doc__ 49 wrapped.__dict__.update(f.__dict__) 50 return wrapped 51 52 53class base_job(object): 54 """The actual job against which we do everything. 55 56 Properties: 57 autodir 58 The top level autotest directory (/usr/local/autotest). 59 Comes from os.environ['AUTODIR']. 60 bindir 61 <autodir>/bin/ 62 libdir 63 <autodir>/lib/ 64 testdir 65 <autodir>/tests/ 66 configdir 67 <autodir>/config/ 68 site_testdir 69 <autodir>/site_tests/ 70 profdir 71 <autodir>/profilers/ 72 tmpdir 73 <autodir>/tmp/ 74 pkgdir 75 <autodir>/packages/ 76 resultdir 77 <autodir>/results/<jobtag> 78 toolsdir 79 <autodir>/tools/ 80 logging 81 logging_manager.LoggingManager object 82 profilers 83 the profilers object for this job 84 harness 85 the server harness object for this job 86 config 87 the job configuration for this job 88 drop_caches_between_iterations 89 drop the pagecache between each iteration 90 default_profile_only 91 default value for the test.execute profile_only paramete 92 """ 93 94 DEFAULT_LOG_FILENAME = "status" 95 WARNING_DISABLE_DELAY = 5 96 97 def __init__(self, control, options, drop_caches=True, 98 extra_copy_cmdline=None): 99 """ 100 Prepare a client side job object. 101 102 @param control: The control file (pathname of). 103 @param options: an object which includes: 104 jobtag: The job tag string (eg "default"). 105 cont: If this is the continuation of this job. 106 harness_type: An alternative server harness. [None] 107 use_external_logging: If true, the enable_external_logging 108 method will be called during construction. [False] 109 @param drop_caches: If true, utils.drop_caches() is called before and 110 between all tests. [True] 111 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 112 copy from the running kernel to all the installed kernels with 113 this job 114 """ 115 self._pre_record_init(control, options) 116 try: 117 self._post_record_init(control, options, drop_caches, 118 extra_copy_cmdline) 119 except Exception, err: 120 self.record( 121 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 122 str(err)) 123 raise 124 125 126 def _pre_record_init(self, control, options): 127 """ 128 Initialization function that should peform ONLY the required 129 setup so that the self.record() method works. 130 131 As of now self.record() needs self.resultdir, self.group_level, 132 self.log_filename, self.harness. 133 """ 134 self.autodir = os.environ['AUTODIR'] 135 self.resultdir = os.path.join(self.autodir, 'results', options.tag) 136 self.tmpdir = os.path.join(self.autodir, 'tmp') 137 138 if not os.path.exists(self.resultdir): 139 os.makedirs(self.resultdir) 140 141 if not options.cont: 142 self._cleanup_results_dir() 143 # Don't cleanup the tmp dir (which contains the lockfile) 144 # in the constructor, this would be a problem for multiple 145 # jobs starting at the same time on the same client. Instead 146 # do the delete at the server side. We simply create the tmp 147 # directory here if it does not already exist. 148 if not os.path.exists(self.tmpdir): 149 os.mkdir(self.tmpdir) 150 151 logging_manager.configure_logging( 152 client_logging_config.ClientLoggingConfig(), 153 results_dir=self.resultdir, 154 verbose=options.verbose) 155 logging.info('Writing results to %s', self.resultdir) 156 157 self.log_filename = self.DEFAULT_LOG_FILENAME 158 159 # init_group_level needs the state 160 self.control = os.path.realpath(control) 161 self._is_continuation = options.cont 162 self.state_file = self.control + '.state' 163 self.current_step_ancestry = [] 164 self.next_step_index = 0 165 self.testtag = '' 166 self._test_tag_prefix = '' 167 self._load_state() 168 169 self._init_group_level() 170 171 self.harness = harness.select(options.harness, self) 172 173 174 def _post_record_init(self, control, options, drop_caches, 175 extra_copy_cmdline): 176 """ 177 Perform job initialization not required by self.record(). 178 """ 179 self.bindir = os.path.join(self.autodir, 'bin') 180 self.libdir = os.path.join(self.autodir, 'lib') 181 self.testdir = os.path.join(self.autodir, 'tests') 182 self.configdir = os.path.join(self.autodir, 'config') 183 self.site_testdir = os.path.join(self.autodir, 'site_tests') 184 self.profdir = os.path.join(self.autodir, 'profilers') 185 self.toolsdir = os.path.join(self.autodir, 'tools') 186 187 self._init_drop_caches(drop_caches) 188 189 self._init_packages() 190 self.default_profile_only = self.get_state("__default_profile_only", 191 default=False) 192 self.run_test_cleanup = self.get_state("__run_test_cleanup", 193 default=True) 194 195 self.sysinfo = sysinfo.sysinfo(self.resultdir) 196 self._load_sysinfo_state() 197 198 self.last_boot_tag = self.get_state("__last_boot_tag", default=None) 199 self.tag = self.get_state("__job_tag", default=None) 200 201 if not options.cont: 202 if not os.path.exists(self.pkgdir): 203 os.mkdir(self.pkgdir) 204 205 results = os.path.join(self.autodir, 'results') 206 if not os.path.exists(results): 207 os.mkdir(results) 208 209 download = os.path.join(self.testdir, 'download') 210 if not os.path.exists(download): 211 os.mkdir(download) 212 213 os.makedirs(os.path.join(self.resultdir, 'analysis')) 214 215 shutil.copyfile(self.control, 216 os.path.join(self.resultdir, 'control')) 217 218 219 self.control = control 220 self.jobtag = options.tag 221 222 self.logging = logging_manager.get_logging_manager( 223 manage_stdout_and_stderr=True, redirect_fds=True) 224 self.logging.start_logging() 225 226 self.config = config.config(self) 227 self.profilers = profilers.profilers(self) 228 self.host = local_host.LocalHost(hostname=options.hostname) 229 self.autoserv_user = options.autoserv_user 230 231 self._init_bootloader() 232 233 self.sysinfo.log_per_reboot_data() 234 235 if not options.cont: 236 self.record('START', None, None) 237 self._increment_group_level() 238 239 self.harness.run_start() 240 241 if options.log: 242 self.enable_external_logging() 243 244 # load the max disk usage rate - default to no monitoring 245 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 246 247 self._init_cmdline(extra_copy_cmdline) 248 249 250 def _init_drop_caches(self, drop_caches): 251 """ 252 Perform the drop caches initialization. 253 """ 254 self.drop_caches_between_iterations = True 255 self.drop_caches = drop_caches 256 if self.drop_caches: 257 logging.debug("Dropping caches") 258 utils.drop_caches() 259 260 261 def _init_bootloader(self): 262 """ 263 Perform boottool initialization. 264 """ 265 try: 266 tool = self.config_get('boottool.executable') 267 self.bootloader = boottool.boottool(tool) 268 except: 269 pass 270 271 272 def _init_packages(self): 273 """ 274 Perform the packages support initialization. 275 """ 276 self.pkgmgr = packages.PackageManager( 277 self.autodir, run_function_dargs={'timeout':3600}) 278 self.pkgdir = os.path.join(self.autodir, 'packages') 279 280 281 def _init_cmdline(self, extra_copy_cmdline): 282 """ 283 Initialize default cmdline for booted kernels in this job. 284 """ 285 copy_cmdline = set(['console']) 286 if extra_copy_cmdline is not None: 287 copy_cmdline.update(extra_copy_cmdline) 288 289 # extract console= and other args from cmdline and add them into the 290 # base args that we use for all kernels we install 291 cmdline = utils.read_one_line('/proc/cmdline') 292 kernel_args = [] 293 for karg in cmdline.split(): 294 for param in copy_cmdline: 295 if karg.startswith(param) and \ 296 (len(param) == len(karg) or karg[len(param)] == '='): 297 kernel_args.append(karg) 298 self.config_set('boot.default_args', ' '.join(kernel_args)) 299 300 301 def _cleanup_results_dir(self): 302 """Delete everything in resultsdir""" 303 assert os.path.exists(self.resultdir) 304 list_files = glob.glob('%s/*' % self.resultdir) 305 for f in list_files: 306 if os.path.isdir(f): 307 shutil.rmtree(f) 308 elif os.path.isfile(f): 309 os.remove(f) 310 311 312 def disable_warnings(self, warning_type): 313 self.record("INFO", None, None, 314 "disabling %s warnings" % warning_type, 315 {"warnings.disable": warning_type}) 316 time.sleep(self.WARNING_DISABLE_DELAY) 317 318 319 def enable_warnings(self, warning_type): 320 time.sleep(self.WARNING_DISABLE_DELAY) 321 self.record("INFO", None, None, 322 "enabling %s warnings" % warning_type, 323 {"warnings.enable": warning_type}) 324 325 326 def monitor_disk_usage(self, max_rate): 327 """\ 328 Signal that the job should monitor disk space usage on / 329 and generate a warning if a test uses up disk space at a 330 rate exceeding 'max_rate'. 331 332 Parameters: 333 max_rate - the maximium allowed rate of disk consumption 334 during a test, in MB/hour, or 0 to indicate 335 no limit. 336 """ 337 self.set_state('__monitor_disk', max_rate) 338 self.max_disk_usage_rate = max_rate 339 340 341 def relative_path(self, path): 342 """\ 343 Return a patch relative to the job results directory 344 """ 345 head = len(self.resultdir) + 1 # remove the / inbetween 346 return path[head:] 347 348 349 def control_get(self): 350 return self.control 351 352 353 def control_set(self, control): 354 self.control = os.path.abspath(control) 355 356 357 def harness_select(self, which): 358 self.harness = harness.select(which, self) 359 360 361 def config_set(self, name, value): 362 self.config.set(name, value) 363 364 365 def config_get(self, name): 366 return self.config.get(name) 367 368 369 def setup_dirs(self, results_dir, tmp_dir): 370 if not tmp_dir: 371 tmp_dir = os.path.join(self.tmpdir, 'build') 372 if not os.path.exists(tmp_dir): 373 os.mkdir(tmp_dir) 374 if not os.path.isdir(tmp_dir): 375 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 376 raise ValueError(e_msg) 377 378 # We label the first build "build" and then subsequent ones 379 # as "build.2", "build.3", etc. Whilst this is a little bit 380 # inconsistent, 99.9% of jobs will only have one build 381 # (that's not done as kernbench, sparse, or buildtest), 382 # so it works out much cleaner. One of life's comprimises. 383 if not results_dir: 384 results_dir = os.path.join(self.resultdir, 'build') 385 i = 2 386 while os.path.exists(results_dir): 387 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 388 i += 1 389 if not os.path.exists(results_dir): 390 os.mkdir(results_dir) 391 392 return (results_dir, tmp_dir) 393 394 395 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 396 kjob = None ): 397 """Summon a xen object""" 398 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 399 build_dir = 'xen' 400 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 401 leave, kjob) 402 403 404 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 405 """Summon a kernel object""" 406 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 407 build_dir = 'linux' 408 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 409 build_dir, leave) 410 411 412 def barrier(self, *args, **kwds): 413 """Create a barrier object""" 414 return barrier.barrier(*args, **kwds) 415 416 417 def install_pkg(self, name, pkg_type, install_dir): 418 ''' 419 This method is a simple wrapper around the actual package 420 installation method in the Packager class. This is used 421 internally by the profilers, deps and tests code. 422 name : name of the package (ex: sleeptest, dbench etc.) 423 pkg_type : Type of the package (ex: test, dep etc.) 424 install_dir : The directory in which the source is actually 425 untarred into. (ex: client/profilers/<name> for profilers) 426 ''' 427 if self.pkgmgr.repositories: 428 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 429 430 431 def add_repository(self, repo_urls): 432 ''' 433 Adds the repository locations to the job so that packages 434 can be fetched from them when needed. The repository list 435 needs to be a string list 436 Ex: job.add_repository(['http://blah1','http://blah2']) 437 ''' 438 for repo_url in repo_urls: 439 self.pkgmgr.add_repository(repo_url) 440 441 # Fetch the packages' checksum file that contains the checksums 442 # of all the packages if it is not already fetched. The checksum 443 # is always fetched whenever a job is first started. This 444 # is not done in the job's constructor as we don't have the list of 445 # the repositories there (and obviously don't care about this file 446 # if we are not using the repos) 447 try: 448 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 449 base_packages.CHECKSUM_FILE) 450 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 451 checksum_file_path, use_checksum=False) 452 except error.PackageFetchError: 453 # packaging system might not be working in this case 454 # Silently fall back to the normal case 455 pass 456 457 458 def require_gcc(self): 459 """ 460 Test whether gcc is installed on the machine. 461 """ 462 # check if gcc is installed on the system. 463 try: 464 utils.system('which gcc') 465 except error.CmdError, e: 466 raise NotAvailableError('gcc is required by this job and is ' 467 'not available on the system') 468 469 470 def setup_dep(self, deps): 471 """Set up the dependencies for this test. 472 deps is a list of libraries required for this test. 473 """ 474 # Fetch the deps from the repositories and set them up. 475 for dep in deps: 476 dep_dir = os.path.join(self.autodir, 'deps', dep) 477 # Search for the dependency in the repositories if specified, 478 # else check locally. 479 try: 480 self.install_pkg(dep, 'dep', dep_dir) 481 except error.PackageInstallError: 482 # see if the dep is there locally 483 pass 484 485 # dep_dir might not exist if it is not fetched from the repos 486 if not os.path.exists(dep_dir): 487 raise error.TestError("Dependency %s does not exist" % dep) 488 489 os.chdir(dep_dir) 490 utils.system('./' + dep + '.py') 491 492 493 def _runtest(self, url, tag, args, dargs): 494 try: 495 l = lambda : test.runtest(self, url, tag, args, dargs) 496 pid = parallel.fork_start(self.resultdir, l) 497 parallel.fork_waitfor(self.resultdir, pid) 498 except error.TestBaseException: 499 # These are already classified with an error type (exit_status) 500 raise 501 except error.JobError: 502 raise # Caught further up and turned into an ABORT. 503 except Exception, e: 504 # Converts all other exceptions thrown by the test regardless 505 # of phase into a TestError(TestBaseException) subclass that 506 # reports them with their full stack trace. 507 raise error.UnhandledTestError(e) 508 509 510 @_run_test_complete_on_exit 511 def run_test(self, url, *args, **dargs): 512 """Summon a test object and run it. 513 514 tag 515 tag to add to testname 516 url 517 url of the test to run 518 """ 519 520 if not url: 521 raise TypeError("Test name is invalid. " 522 "Switched arguments?") 523 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 524 namelen = len(testname) 525 dargs = dargs.copy() 526 tntag = dargs.pop('tag', None) 527 if tntag: # per-test tag is included in reported test name 528 testname += '.' + str(tntag) 529 run_number = self.get_run_number() 530 if run_number: 531 testname += '._%02d_' % run_number 532 self.set_run_number(run_number + 1) 533 if self._is_kernel_in_test_tag(): 534 testname += '.' + platform.release() 535 if self._test_tag_prefix: 536 testname += '.' + self._test_tag_prefix 537 if self.testtag: # job-level tag is included in reported test name 538 testname += '.' + self.testtag 539 subdir = testname 540 sdtag = dargs.pop('subdir_tag', None) 541 if sdtag: # subdir-only tag is not included in reports 542 subdir = subdir + '.' + str(sdtag) 543 tag = subdir[namelen+1:] # '' if none 544 545 outputdir = os.path.join(self.resultdir, subdir) 546 if os.path.exists(outputdir): 547 msg = ("%s already exists, test <%s> may have" 548 " already run with tag <%s>" 549 % (outputdir, testname, tag) ) 550 raise error.TestError(msg) 551 # NOTE: client/common_lib/test.py runtest() depends directory names 552 # being constructed the same way as in this code. 553 os.mkdir(outputdir) 554 555 def log_warning(reason): 556 self.record("WARN", subdir, testname, reason) 557 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 558 def group_func(): 559 try: 560 self._runtest(url, tag, args, dargs) 561 except error.TestBaseException, detail: 562 # The error is already classified, record it properly. 563 self.record(detail.exit_status, subdir, testname, str(detail)) 564 raise 565 else: 566 self.record('GOOD', subdir, testname, 'completed successfully') 567 568 try: 569 self._rungroup(subdir, testname, group_func) 570 return True 571 except error.TestBaseException: 572 return False 573 # Any other exception here will be given to the caller 574 # 575 # NOTE: The only exception possible from the control file here 576 # is error.JobError as _runtest() turns all others into an 577 # UnhandledTestError that is caught above. 578 579 580 def _rungroup(self, subdir, testname, function, *args, **dargs): 581 """\ 582 subdir: 583 name of the group 584 testname: 585 name of the test to run, or support step 586 function: 587 subroutine to run 588 *args: 589 arguments for the function 590 591 Returns the result of the passed in function 592 """ 593 594 try: 595 self.record('START', subdir, testname) 596 self._increment_group_level() 597 result = function(*args, **dargs) 598 self._decrement_group_level() 599 self.record('END GOOD', subdir, testname) 600 return result 601 except error.TestBaseException, e: 602 self._decrement_group_level() 603 self.record('END %s' % e.exit_status, subdir, testname) 604 raise 605 except error.JobError, e: 606 self._decrement_group_level() 607 self.record('END ABORT', subdir, testname) 608 raise 609 except Exception, e: 610 # This should only ever happen due to a bug in the given 611 # function's code. The common case of being called by 612 # run_test() will never reach this. If a control file called 613 # run_group() itself, bugs in its function will be caught 614 # here. 615 self._decrement_group_level() 616 err_msg = str(e) + '\n' + traceback.format_exc() 617 self.record('END ERROR', subdir, testname, err_msg) 618 raise 619 620 621 def run_group(self, function, tag=None, **dargs): 622 """ 623 Run a function nested within a group level. 624 625 function: 626 Callable to run. 627 tag: 628 An optional tag name for the group. If None (default) 629 function.__name__ will be used. 630 **dargs: 631 Named arguments for the function. 632 """ 633 if tag: 634 name = tag 635 else: 636 name = function.__name__ 637 638 try: 639 return self._rungroup(subdir=None, testname=name, 640 function=function, **dargs) 641 except (SystemExit, error.TestBaseException): 642 raise 643 # If there was a different exception, turn it into a TestError. 644 # It will be caught by step_engine or _run_step_fn. 645 except Exception, e: 646 raise error.UnhandledTestError(e) 647 648 649 _RUN_NUMBER_STATE = '__run_number' 650 def get_run_number(self): 651 """Get the run number or 0 if no run number has been set.""" 652 return self.get_state(self._RUN_NUMBER_STATE, default=0) 653 654 655 def set_run_number(self, number): 656 """If the run number is non-zero it will be in the output dir name.""" 657 self.set_state(self._RUN_NUMBER_STATE, number) 658 659 660 _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' 661 def _is_kernel_in_test_tag(self): 662 """Boolean: should the kernel version be included in the test tag.""" 663 return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) 664 665 666 def show_kernel_in_test_tag(self, value=True): 667 """If True, the kernel version at test time will prefix test tags.""" 668 self.set_state(self._KERNEL_IN_TAG_STATE, value) 669 670 671 def set_test_tag(self, tag=''): 672 """Set tag to be added to test name of all following run_test steps.""" 673 self.testtag = tag 674 675 676 def set_test_tag_prefix(self, prefix): 677 """Set a prefix string to prepend to all future run_test steps. 678 679 Args: 680 prefix: A string to prepend to any run_test() step tags separated by 681 a '.'; use the empty string '' to clear it. 682 """ 683 self._test_tag_prefix = prefix 684 685 686 def cpu_count(self): 687 return utils.count_cpus() # use total system count 688 689 690 def start_reboot(self): 691 self.record('START', None, 'reboot') 692 self._increment_group_level() 693 self.record('GOOD', None, 'reboot.start') 694 695 696 def _record_reboot_failure(self, subdir, operation, status, 697 running_id=None): 698 self.record("ABORT", subdir, operation, status) 699 self._decrement_group_level() 700 if not running_id: 701 running_id = utils.running_os_ident() 702 kernel = {"kernel": running_id.split("::")[0]} 703 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 704 705 706 def _check_post_reboot(self, subdir, running_id=None): 707 """ 708 Function to perform post boot checks such as if the mounted 709 partition list has changed across reboots. 710 """ 711 partition_list = partition_lib.get_partition_list(self) 712 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 713 old_mount_info = self.get_state("__mount_info") 714 if mount_info != old_mount_info: 715 new_entries = mount_info - old_mount_info 716 old_entries = old_mount_info - mount_info 717 description = ("mounted partitions are different after reboot " 718 "(old entries: %s, new entries: %s)" % 719 (old_entries, new_entries)) 720 self._record_reboot_failure(subdir, "reboot.verify_config", 721 description, running_id=running_id) 722 raise error.JobError("Reboot failed: %s" % description) 723 724 725 def end_reboot(self, subdir, kernel, patches, running_id=None): 726 self._check_post_reboot(subdir, running_id=running_id) 727 728 # strip ::<timestamp> from the kernel version if present 729 kernel = kernel.split("::")[0] 730 kernel_info = {"kernel": kernel} 731 for i, patch in enumerate(patches): 732 kernel_info["patch%d" % i] = patch 733 self._decrement_group_level() 734 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 735 736 737 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 738 type='src', patches=[]): 739 """ Check the passed kernel identifier against the command line 740 and the running kernel, abort the job on missmatch. """ 741 742 logging.info("POST BOOT: checking booted kernel " 743 "mark=%d identity='%s' type='%s'", 744 expected_when, expected_id, type) 745 746 running_id = utils.running_os_ident() 747 748 cmdline = utils.read_one_line("/proc/cmdline") 749 750 find_sum = re.compile(r'.*IDENT=(\d+)') 751 m = find_sum.match(cmdline) 752 cmdline_when = -1 753 if m: 754 cmdline_when = int(m.groups()[0]) 755 756 # We have all the facts, see if they indicate we 757 # booted the requested kernel or not. 758 bad = False 759 if (type == 'src' and expected_id != running_id or 760 type == 'rpm' and 761 not running_id.startswith(expected_id + '::')): 762 logging.error("Kernel identifier mismatch") 763 bad = True 764 if expected_when != cmdline_when: 765 logging.error("Kernel command line mismatch") 766 bad = True 767 768 if bad: 769 logging.error(" Expected Ident: " + expected_id) 770 logging.error(" Running Ident: " + running_id) 771 logging.error(" Expected Mark: %d", expected_when) 772 logging.error("Command Line Mark: %d", cmdline_when) 773 logging.error(" Command Line: " + cmdline) 774 775 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 776 running_id=running_id) 777 raise error.JobError("Reboot returned with the wrong kernel") 778 779 self.record('GOOD', subdir, 'reboot.verify', 780 utils.running_os_full_version()) 781 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 782 783 784 def partition(self, device, loop_size=0, mountpoint=None): 785 """ 786 Work with a machine partition 787 788 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 789 @param mountpoint: Specify a directory to mount to. If not specified 790 autotest tmp directory will be used. 791 @param loop_size: Size of loopback device (in MB). Defaults to 0. 792 793 @return: A L{client.bin.partition.partition} object 794 """ 795 796 if not mountpoint: 797 mountpoint = self.tmpdir 798 return partition_lib.partition(self, device, loop_size, mountpoint) 799 800 @utils.deprecated 801 def filesystem(self, device, mountpoint=None, loop_size=0): 802 """ Same as partition 803 804 @deprecated: Use partition method instead 805 """ 806 return self.partition(device, loop_size, mountpoint) 807 808 809 def enable_external_logging(self): 810 pass 811 812 813 def disable_external_logging(self): 814 pass 815 816 817 def set_default_profile_only(self, val): 818 """ Set the default_profile_only mode. """ 819 self.set_state("__default_profile_only", val) 820 self.default_profile_only = val 821 822 823 def enable_test_cleanup(self): 824 """ By default tests run test.cleanup """ 825 self.set_state("__run_test_cleanup", True) 826 self.run_test_cleanup = True 827 828 829 def disable_test_cleanup(self): 830 """ By default tests do not run test.cleanup """ 831 self.set_state("__run_test_cleanup", False) 832 self.run_test_cleanup = False 833 834 835 def default_test_cleanup(self, val): 836 if not self._is_continuation: 837 self.set_state("__run_test_cleanup", val) 838 self.run_test_cleanup = val 839 840 841 def default_boot_tag(self, tag): 842 if not self._is_continuation: 843 self.set_state("__last_boot_tag", tag) 844 self.last_boot_tag = tag 845 846 847 def default_tag(self, tag): 848 """Allows the scheduler's job tag to be passed in from autoserv.""" 849 if not self._is_continuation: 850 self.set_state("__job_tag", tag) 851 self.tag = tag 852 853 854 def reboot_setup(self): 855 # save the partition list and their mount point and compare it 856 # after reboot 857 partition_list = partition_lib.get_partition_list(self) 858 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 859 self.set_state("__mount_info", mount_info) 860 861 862 def reboot(self, tag=LAST_BOOT_TAG): 863 if tag == LAST_BOOT_TAG: 864 tag = self.last_boot_tag 865 else: 866 self.set_state("__last_boot_tag", tag) 867 self.last_boot_tag = tag 868 869 self.reboot_setup() 870 self.harness.run_reboot() 871 default = self.config_get('boot.set_default') 872 if default: 873 self.bootloader.set_default(tag) 874 else: 875 self.bootloader.boot_once(tag) 876 877 # HACK: using this as a module sometimes hangs shutdown, so if it's 878 # installed unload it first 879 utils.system("modprobe -r netconsole", ignore_status=True) 880 881 # sync first, so that a sync during shutdown doesn't time out 882 utils.system("sync; sync", ignore_status=True) 883 884 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 885 self.quit() 886 887 888 def noop(self, text): 889 logging.info("job: noop: " + text) 890 891 892 @_run_test_complete_on_exit 893 def parallel(self, *tasklist): 894 """Run tasks in parallel""" 895 896 pids = [] 897 old_log_filename = self.log_filename 898 for i, task in enumerate(tasklist): 899 assert isinstance(task, (tuple, list)) 900 self.log_filename = old_log_filename + (".%d" % i) 901 task_func = lambda: task[0](*task[1:]) 902 pids.append(parallel.fork_start(self.resultdir, task_func)) 903 904 old_log_path = os.path.join(self.resultdir, old_log_filename) 905 old_log = open(old_log_path, "a") 906 exceptions = [] 907 for i, pid in enumerate(pids): 908 # wait for the task to finish 909 try: 910 parallel.fork_waitfor(self.resultdir, pid) 911 except Exception, e: 912 exceptions.append(e) 913 # copy the logs from the subtask into the main log 914 new_log_path = old_log_path + (".%d" % i) 915 if os.path.exists(new_log_path): 916 new_log = open(new_log_path) 917 old_log.write(new_log.read()) 918 new_log.close() 919 old_log.flush() 920 os.remove(new_log_path) 921 old_log.close() 922 923 self.log_filename = old_log_filename 924 925 # handle any exceptions raised by the parallel tasks 926 if exceptions: 927 msg = "%d task(s) failed in job.parallel" % len(exceptions) 928 raise error.JobError(msg) 929 930 931 def quit(self): 932 # XXX: should have a better name. 933 self.harness.run_pause() 934 raise error.JobContinue("more to come") 935 936 937 def complete(self, status): 938 """Clean up and exit""" 939 # We are about to exit 'complete' so clean up the control file. 940 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 941 shutil.move(self.state_file, dest) 942 943 self.harness.run_complete() 944 self.disable_external_logging() 945 sys.exit(status) 946 947 948 def set_state(self, var, val): 949 # Deep copies make sure that the state can't be altered 950 # without it being re-written. Perf wise, deep copies 951 # are overshadowed by pickling/loading. 952 self.state[var] = copy.deepcopy(val) 953 outfile = open(self.state_file, 'wb') 954 try: 955 pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) 956 finally: 957 outfile.close() 958 logging.debug("Persistent state variable %s now set to %r", var, val) 959 960 961 def _load_state(self): 962 if hasattr(self, 'state'): 963 raise RuntimeError('state already exists') 964 infile = None 965 try: 966 try: 967 infile = open(self.state_file, 'rb') 968 self.state = pickle.load(infile) 969 logging.info('Loaded state from %s', self.state_file) 970 except IOError: 971 self.state = {} 972 initialize = True 973 finally: 974 if infile: 975 infile.close() 976 977 initialize = '__steps' not in self.state 978 if not (self._is_continuation or initialize): 979 raise RuntimeError('Loaded state must contain __steps or be a ' 980 'continuation.') 981 982 if initialize: 983 logging.info('Initializing the state engine') 984 self.set_state('__steps', []) # writes pickle file 985 986 987 def get_state(self, var, default=NO_DEFAULT): 988 if var in self.state or default == NO_DEFAULT: 989 val = self.state[var] 990 else: 991 val = default 992 return copy.deepcopy(val) 993 994 995 def __create_step_tuple(self, fn, args, dargs): 996 # Legacy code passes in an array where the first arg is 997 # the function or its name. 998 if isinstance(fn, list): 999 assert(len(args) == 0) 1000 assert(len(dargs) == 0) 1001 args = fn[1:] 1002 fn = fn[0] 1003 # Pickling actual functions is hairy, thus we have to call 1004 # them by name. Unfortunately, this means only functions 1005 # defined globally can be used as a next step. 1006 if callable(fn): 1007 fn = fn.__name__ 1008 if not isinstance(fn, types.StringTypes): 1009 raise StepError("Next steps must be functions or " 1010 "strings containing the function name") 1011 ancestry = copy.copy(self.current_step_ancestry) 1012 return (ancestry, fn, args, dargs) 1013 1014 1015 def next_step_append(self, fn, *args, **dargs): 1016 """Define the next step and place it at the end""" 1017 steps = self.get_state('__steps') 1018 steps.append(self.__create_step_tuple(fn, args, dargs)) 1019 self.set_state('__steps', steps) 1020 1021 1022 def next_step(self, fn, *args, **dargs): 1023 """Create a new step and place it after any steps added 1024 while running the current step but before any steps added in 1025 previous steps""" 1026 steps = self.get_state('__steps') 1027 steps.insert(self.next_step_index, 1028 self.__create_step_tuple(fn, args, dargs)) 1029 self.next_step_index += 1 1030 self.set_state('__steps', steps) 1031 1032 1033 def next_step_prepend(self, fn, *args, **dargs): 1034 """Insert a new step, executing first""" 1035 steps = self.get_state('__steps') 1036 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 1037 self.next_step_index += 1 1038 self.set_state('__steps', steps) 1039 1040 1041 def _run_step_fn(self, local_vars, fn, args, dargs): 1042 """Run a (step) function within the given context""" 1043 1044 local_vars['__args'] = args 1045 local_vars['__dargs'] = dargs 1046 try: 1047 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 1048 return local_vars['__ret'] 1049 except SystemExit: 1050 raise # Send error.JobContinue and JobComplete on up to runjob. 1051 except error.TestNAError, detail: 1052 self.record(detail.exit_status, None, fn, str(detail)) 1053 except Exception, detail: 1054 raise error.UnhandledJobError(detail) 1055 1056 1057 def _create_frame(self, global_vars, ancestry, fn_name): 1058 """Set up the environment like it would have been when this 1059 function was first defined. 1060 1061 Child step engine 'implementations' must have 'return locals()' 1062 at end end of their steps. Because of this, we can call the 1063 parent function and get back all child functions (i.e. those 1064 defined within it). 1065 1066 Unfortunately, the call stack of the function calling 1067 job.next_step might have been deeper than the function it 1068 added. In order to make sure that the environment is what it 1069 should be, we need to then pop off the frames we built until 1070 we find the frame where the function was first defined.""" 1071 1072 # The copies ensure that the parent frames are not modified 1073 # while building child frames. This matters if we then 1074 # pop some frames in the next part of this function. 1075 current_frame = copy.copy(global_vars) 1076 frames = [current_frame] 1077 for steps_fn_name in ancestry: 1078 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1079 current_frame = copy.copy(ret) 1080 frames.append(current_frame) 1081 1082 # Walk up the stack frames until we find the place fn_name was defined. 1083 while len(frames) > 2: 1084 if fn_name not in frames[-2]: 1085 break 1086 if frames[-2][fn_name] != frames[-1][fn_name]: 1087 break 1088 frames.pop() 1089 ancestry.pop() 1090 1091 return (frames[-1], ancestry) 1092 1093 1094 def _add_step_init(self, local_vars, current_function): 1095 """If the function returned a dictionary that includes a 1096 function named 'step_init', prepend it to our list of steps. 1097 This will only get run the first time a function with a nested 1098 use of the step engine is run.""" 1099 1100 if (isinstance(local_vars, dict) and 1101 'step_init' in local_vars and 1102 callable(local_vars['step_init'])): 1103 # The init step is a child of the function 1104 # we were just running. 1105 self.current_step_ancestry.append(current_function) 1106 self.next_step_prepend('step_init') 1107 1108 1109 def step_engine(self): 1110 """The multi-run engine used when the control file defines step_init. 1111 1112 Does the next step. 1113 """ 1114 1115 # Set up the environment and then interpret the control file. 1116 # Some control files will have code outside of functions, 1117 # which means we need to have our state engine initialized 1118 # before reading in the file. 1119 global_control_vars = {'job': self} 1120 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1121 try: 1122 execfile(self.control, global_control_vars, global_control_vars) 1123 except error.TestNAError, detail: 1124 self.record(detail.exit_status, None, self.control, str(detail)) 1125 except SystemExit: 1126 raise # Send error.JobContinue and JobComplete on up to runjob. 1127 except Exception, detail: 1128 # Syntax errors or other general Python exceptions coming out of 1129 # the top level of the control file itself go through here. 1130 raise error.UnhandledJobError(detail) 1131 1132 # If we loaded in a mid-job state file, then we presumably 1133 # know what steps we have yet to run. 1134 if not self._is_continuation: 1135 if 'step_init' in global_control_vars: 1136 self.next_step(global_control_vars['step_init']) 1137 1138 # Iterate through the steps. If we reboot, we'll simply 1139 # continue iterating on the next step. 1140 while len(self.get_state('__steps')) > 0: 1141 steps = self.get_state('__steps') 1142 (ancestry, fn_name, args, dargs) = steps.pop(0) 1143 self.set_state('__steps', steps) 1144 1145 self.next_step_index = 0 1146 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1147 local_vars, self.current_step_ancestry = ret 1148 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1149 self._add_step_init(local_vars, fn_name) 1150 1151 1152 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1153 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1154 on_every_test) 1155 1156 1157 def add_sysinfo_logfile(self, file, on_every_test=False): 1158 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1159 1160 1161 def _add_sysinfo_loggable(self, loggable, on_every_test): 1162 if on_every_test: 1163 self.sysinfo.test_loggables.add(loggable) 1164 else: 1165 self.sysinfo.boot_loggables.add(loggable) 1166 self._save_sysinfo_state() 1167 1168 1169 def _load_sysinfo_state(self): 1170 state = self.get_state("__sysinfo", None) 1171 if state: 1172 self.sysinfo.deserialize(state) 1173 1174 1175 def _save_sysinfo_state(self): 1176 state = self.sysinfo.serialize() 1177 self.set_state("__sysinfo", state) 1178 1179 1180 def _init_group_level(self): 1181 self.group_level = self.get_state("__group_level", default=0) 1182 1183 1184 def _increment_group_level(self): 1185 self.group_level += 1 1186 self.set_state("__group_level", self.group_level) 1187 1188 1189 def _decrement_group_level(self): 1190 self.group_level -= 1 1191 self.set_state("__group_level", self.group_level) 1192 1193 1194 def record(self, status_code, subdir, operation, status = '', 1195 optional_fields=None): 1196 """ 1197 Record job-level status 1198 1199 The intent is to make this file both machine parseable and 1200 human readable. That involves a little more complexity, but 1201 really isn't all that bad ;-) 1202 1203 Format is <status code>\t<subdir>\t<operation>\t<status> 1204 1205 status code: (GOOD|WARN|FAIL|ABORT) 1206 or START 1207 or END (GOOD|WARN|FAIL|ABORT) 1208 1209 subdir: MUST be a relevant subdirectory in the results, 1210 or None, which will be represented as '----' 1211 1212 operation: description of what you ran (e.g. "dbench", or 1213 "mkfs -t foobar /dev/sda9") 1214 1215 status: error message or "completed sucessfully" 1216 1217 ------------------------------------------------------------ 1218 1219 Initial tabs indicate indent levels for grouping, and is 1220 governed by self.group_level 1221 1222 multiline messages have secondary lines prefaced by a double 1223 space (' ') 1224 """ 1225 1226 if subdir: 1227 if re.match(r'[\n\t]', subdir): 1228 raise ValueError("Invalid character in subdir string") 1229 substr = subdir 1230 else: 1231 substr = '----' 1232 1233 if not log.is_valid_status(status_code): 1234 raise ValueError("Invalid status code supplied: %s" % status_code) 1235 if not operation: 1236 operation = '----' 1237 1238 if re.match(r'[\n\t]', operation): 1239 raise ValueError("Invalid character in operation string") 1240 operation = operation.rstrip() 1241 1242 if not optional_fields: 1243 optional_fields = {} 1244 1245 status = status.rstrip() 1246 status = re.sub(r"\t", " ", status) 1247 # Ensure any continuation lines are marked so we can 1248 # detect them in the status file to ensure it is parsable. 1249 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 1250 1251 # Generate timestamps for inclusion in the logs 1252 epoch_time = int(time.time()) # seconds since epoch, in UTC 1253 local_time = time.localtime(epoch_time) 1254 optional_fields["timestamp"] = str(epoch_time) 1255 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1256 local_time) 1257 1258 fields = [status_code, substr, operation] 1259 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1260 fields.append(status) 1261 1262 msg = '\t'.join(str(x) for x in fields) 1263 msg = '\t' * self.group_level + msg 1264 1265 msg_tag = "" 1266 if "." in self.log_filename: 1267 msg_tag = self.log_filename.split(".", 1)[1] 1268 1269 self.harness.test_status_detail(status_code, substr, operation, status, 1270 msg_tag) 1271 self.harness.test_status(msg, msg_tag) 1272 1273 # log to stdout (if enabled) 1274 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 1275 logging.info(msg) 1276 1277 # log to the "root" status log 1278 status_file = os.path.join(self.resultdir, self.log_filename) 1279 open(status_file, "a").write(msg + "\n") 1280 1281 # log to the subdir status log (if subdir is set) 1282 if subdir: 1283 dir = os.path.join(self.resultdir, subdir) 1284 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1285 open(status_file, "a").write(msg + "\n") 1286 1287 1288class disk_usage_monitor: 1289 def __init__(self, logging_func, device, max_mb_per_hour): 1290 self.func = logging_func 1291 self.device = device 1292 self.max_mb_per_hour = max_mb_per_hour 1293 1294 1295 def start(self): 1296 self.initial_space = utils.freespace(self.device) 1297 self.start_time = time.time() 1298 1299 1300 def stop(self): 1301 # if no maximum usage rate was set, we don't need to 1302 # generate any warnings 1303 if not self.max_mb_per_hour: 1304 return 1305 1306 final_space = utils.freespace(self.device) 1307 used_space = self.initial_space - final_space 1308 stop_time = time.time() 1309 total_time = stop_time - self.start_time 1310 # round up the time to one minute, to keep extremely short 1311 # tests from generating false positives due to short, badly 1312 # timed bursts of activity 1313 total_time = max(total_time, 60.0) 1314 1315 # determine the usage rate 1316 bytes_per_sec = used_space / total_time 1317 mb_per_sec = bytes_per_sec / 1024**2 1318 mb_per_hour = mb_per_sec * 60 * 60 1319 1320 if mb_per_hour > self.max_mb_per_hour: 1321 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1322 msg %= (self.device, mb_per_hour) 1323 self.func(msg) 1324 1325 1326 @classmethod 1327 def watch(cls, *monitor_args, **monitor_dargs): 1328 """ Generic decorator to wrap a function call with the 1329 standard create-monitor -> start -> call -> stop idiom.""" 1330 def decorator(func): 1331 def watched_func(*args, **dargs): 1332 monitor = cls(*monitor_args, **monitor_dargs) 1333 monitor.start() 1334 try: 1335 func(*args, **dargs) 1336 finally: 1337 monitor.stop() 1338 return watched_func 1339 return decorator 1340 1341 1342def runjob(control, options): 1343 """ 1344 Run a job using the given control file. 1345 1346 This is the main interface to this module. 1347 1348 @see base_job.__init__ for parameter info. 1349 """ 1350 control = os.path.abspath(control) 1351 state = control + '.state' 1352 # Ensure state file is cleaned up before the job starts to run if autotest 1353 # is not running with the --continue flag 1354 if not options.cont and os.path.isfile(state): 1355 logging.debug('Cleaning up previously found state file') 1356 os.remove(state) 1357 1358 # instantiate the job object ready for the control file. 1359 myjob = None 1360 try: 1361 # Check that the control file is valid 1362 if not os.path.exists(control): 1363 raise error.JobError(control + ": control file not found") 1364 1365 # When continuing, the job is complete when there is no 1366 # state file, ensure we don't try and continue. 1367 if options.cont and not os.path.exists(state): 1368 raise error.JobComplete("all done") 1369 1370 myjob = job(control, options) 1371 1372 # Load in the users control file, may do any one of: 1373 # 1) execute in toto 1374 # 2) define steps, and select the first via next_step() 1375 myjob.step_engine() 1376 1377 except error.JobContinue: 1378 sys.exit(5) 1379 1380 except error.JobComplete: 1381 sys.exit(1) 1382 1383 except error.JobError, instance: 1384 logging.error("JOB ERROR: " + instance.args[0]) 1385 if myjob: 1386 command = None 1387 if len(instance.args) > 1: 1388 command = instance.args[1] 1389 myjob.record('ABORT', None, command, instance.args[0]) 1390 myjob._decrement_group_level() 1391 myjob.record('END ABORT', None, None, instance.args[0]) 1392 assert (myjob.group_level == 0), ('myjob.group_level must be 0,' 1393 ' not %d' % myjob.group_level) 1394 myjob.complete(1) 1395 else: 1396 sys.exit(1) 1397 1398 except Exception, e: 1399 # NOTE: job._run_step_fn and job.step_engine will turn things into 1400 # a JobError for us. If we get here, its likely an autotest bug. 1401 msg = str(e) + '\n' + traceback.format_exc() 1402 logging.critical("JOB ERROR (autotest bug?): " + msg) 1403 if myjob: 1404 myjob._decrement_group_level() 1405 myjob.record('END ABORT', None, None, msg) 1406 assert(myjob.group_level == 0) 1407 myjob.complete(1) 1408 else: 1409 sys.exit(1) 1410 1411 # If we get here, then we assume the job is complete and good. 1412 myjob._decrement_group_level() 1413 myjob.record('END GOOD', None, None) 1414 assert(myjob.group_level == 0) 1415 1416 myjob.complete(0) 1417 1418 1419site_job = utils.import_site_class( 1420 __file__, "autotest_lib.client.bin.site_job", "site_job", base_job) 1421 1422class job(site_job): 1423 pass 1424