job.py revision ce955fcf58207d795ff1fe0001851b8b3728452c
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types, glob 9import logging 10import cPickle as pickle 11from autotest_lib.client.bin import client_logging_config 12from autotest_lib.client.bin import utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, test, local_host 15from autotest_lib.client.bin import partition as partition_lib 16from autotest_lib.client.common_lib import error, barrier, log, logging_manager 17from autotest_lib.client.common_lib import base_packages, packages 18 19LAST_BOOT_TAG = object() 20NO_DEFAULT = object() 21JOB_PREAMBLE = """ 22from autotest_lib.client.common_lib.error import * 23from autotest_lib.client.bin.utils import * 24""" 25 26 27class StepError(error.AutotestError): 28 pass 29 30class NotAvailableError(error.AutotestError): 31 pass 32 33 34 35def _run_test_complete_on_exit(f): 36 """Decorator for job methods that automatically calls 37 self.harness.run_test_complete when the method exits, if appropriate.""" 38 def wrapped(self, *args, **dargs): 39 try: 40 return f(self, *args, **dargs) 41 finally: 42 if self.log_filename == self.DEFAULT_LOG_FILENAME: 43 self.harness.run_test_complete() 44 if self.drop_caches: 45 logging.debug("Dropping caches") 46 utils.drop_caches() 47 wrapped.__name__ = f.__name__ 48 wrapped.__doc__ = f.__doc__ 49 wrapped.__dict__.update(f.__dict__) 50 return wrapped 51 52 53class base_job(object): 54 """The actual job against which we do everything. 55 56 Properties: 57 autodir 58 The top level autotest directory (/usr/local/autotest). 59 Comes from os.environ['AUTODIR']. 60 bindir 61 <autodir>/bin/ 62 libdir 63 <autodir>/lib/ 64 testdir 65 <autodir>/tests/ 66 configdir 67 <autodir>/config/ 68 site_testdir 69 <autodir>/site_tests/ 70 profdir 71 <autodir>/profilers/ 72 tmpdir 73 <autodir>/tmp/ 74 pkgdir 75 <autodir>/packages/ 76 resultdir 77 <autodir>/results/<jobtag> 78 toolsdir 79 <autodir>/tools/ 80 logging 81 logging_manager.LoggingManager object 82 profilers 83 the profilers object for this job 84 harness 85 the server harness object for this job 86 config 87 the job configuration for this job 88 drop_caches_between_iterations 89 drop the pagecache between each iteration 90 """ 91 92 DEFAULT_LOG_FILENAME = "status" 93 WARNING_DISABLE_DELAY = 5 94 95 def __init__(self, control, options, drop_caches=True, 96 extra_copy_cmdline=None): 97 """ 98 Prepare a client side job object. 99 100 @param control: The control file (pathname of). 101 @param options: an object which includes: 102 jobtag: The job tag string (eg "default"). 103 cont: If this is the continuation of this job. 104 harness_type: An alternative server harness. [None] 105 use_external_logging: If true, the enable_external_logging 106 method will be called during construction. [False] 107 @param drop_caches: If true, utils.drop_caches() is called before and 108 between all tests. [True] 109 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 110 copy from the running kernel to all the installed kernels with 111 this job 112 """ 113 self.autodir = os.environ['AUTODIR'] 114 self.bindir = os.path.join(self.autodir, 'bin') 115 self.libdir = os.path.join(self.autodir, 'lib') 116 self.testdir = os.path.join(self.autodir, 'tests') 117 self.configdir = os.path.join(self.autodir, 'config') 118 self.site_testdir = os.path.join(self.autodir, 'site_tests') 119 self.profdir = os.path.join(self.autodir, 'profilers') 120 self.tmpdir = os.path.join(self.autodir, 'tmp') 121 self.toolsdir = os.path.join(self.autodir, 'tools') 122 self.resultdir = os.path.join(self.autodir, 'results', options.tag) 123 124 if not os.path.exists(self.resultdir): 125 os.makedirs(self.resultdir) 126 127 if not options.cont: 128 self._cleanup_results_dir() 129 130 logging_manager.configure_logging( 131 client_logging_config.ClientLoggingConfig(), 132 results_dir=self.resultdir, 133 verbose=options.verbose) 134 logging.info('Writing results to %s', self.resultdir) 135 136 self.drop_caches_between_iterations = False 137 self.drop_caches = drop_caches 138 if self.drop_caches: 139 logging.debug("Dropping caches") 140 utils.drop_caches() 141 142 self.control = os.path.realpath(control) 143 self._is_continuation = options.cont 144 self.state_file = self.control + '.state' 145 self.current_step_ancestry = [] 146 self.next_step_index = 0 147 self.testtag = '' 148 self._test_tag_prefix = '' 149 150 self._load_state() 151 self.pkgmgr = packages.PackageManager( 152 self.autodir, run_function_dargs={'timeout':3600}) 153 self.pkgdir = os.path.join(self.autodir, 'packages') 154 self.run_test_cleanup = self.get_state("__run_test_cleanup", 155 default=True) 156 157 self.sysinfo = sysinfo.sysinfo(self.resultdir) 158 self._load_sysinfo_state() 159 160 self.last_boot_tag = self.get_state("__last_boot_tag", default=None) 161 self.tag = self.get_state("__job_tag", default=None) 162 163 if not options.cont: 164 """ 165 Don't cleanup the tmp dir (which contains the lockfile) 166 in the constructor, this would be a problem for multiple 167 jobs starting at the same time on the same client. Instead 168 do the delete at the server side. We simply create the tmp 169 directory here if it does not already exist. 170 """ 171 if not os.path.exists(self.tmpdir): 172 os.mkdir(self.tmpdir) 173 174 if not os.path.exists(self.pkgdir): 175 os.mkdir(self.pkgdir) 176 177 results = os.path.join(self.autodir, 'results') 178 if not os.path.exists(results): 179 os.mkdir(results) 180 181 download = os.path.join(self.testdir, 'download') 182 if not os.path.exists(download): 183 os.mkdir(download) 184 185 os.makedirs(os.path.join(self.resultdir, 'analysis')) 186 187 shutil.copyfile(self.control, 188 os.path.join(self.resultdir, 'control')) 189 190 191 self.control = control 192 self.jobtag = options.tag 193 self.log_filename = self.DEFAULT_LOG_FILENAME 194 195 self.logging = logging_manager.get_logging_manager( 196 manage_stdout_and_stderr=True, redirect_fds=True) 197 self.logging.start_logging() 198 199 self._init_group_level() 200 201 self.config = config.config(self) 202 self.harness = harness.select(options.harness, self) 203 self.profilers = profilers.profilers(self) 204 self.host = local_host.LocalHost(hostname=options.hostname) 205 206 try: 207 tool = self.config_get('boottool.executable') 208 self.bootloader = boottool.boottool(tool) 209 except: 210 pass 211 212 self.sysinfo.log_per_reboot_data() 213 214 if not options.cont: 215 self.record('START', None, None) 216 self._increment_group_level() 217 218 self.harness.run_start() 219 220 if options.log: 221 self.enable_external_logging() 222 223 # load the max disk usage rate - default to no monitoring 224 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 225 226 copy_cmdline = set(['console']) 227 if extra_copy_cmdline is not None: 228 copy_cmdline.update(extra_copy_cmdline) 229 230 # extract console= and other args from cmdline and add them into the 231 # base args that we use for all kernels we install 232 cmdline = utils.read_one_line('/proc/cmdline') 233 kernel_args = [] 234 for karg in cmdline.split(): 235 for param in copy_cmdline: 236 if karg.startswith(param) and \ 237 (len(param) == len(karg) or karg[len(param)] == '='): 238 kernel_args.append(karg) 239 self.config_set('boot.default_args', ' '.join(kernel_args)) 240 241 242 def _cleanup_results_dir(self): 243 """Delete everything in resultsdir""" 244 assert os.path.exists(self.resultdir) 245 list_files = glob.glob('%s/*' % self.resultdir) 246 for f in list_files: 247 if os.path.isdir(f): 248 shutil.rmtree(f) 249 elif os.path.isfile(f): 250 os.remove(f) 251 252 253 def disable_warnings(self, warning_type): 254 self.record("INFO", None, None, 255 "disabling %s warnings" % warning_type, 256 {"warnings.disable": warning_type}) 257 time.sleep(self.WARNING_DISABLE_DELAY) 258 259 260 def enable_warnings(self, warning_type): 261 time.sleep(self.WARNING_DISABLE_DELAY) 262 self.record("INFO", None, None, 263 "enabling %s warnings" % warning_type, 264 {"warnings.enable": warning_type}) 265 266 267 def monitor_disk_usage(self, max_rate): 268 """\ 269 Signal that the job should monitor disk space usage on / 270 and generate a warning if a test uses up disk space at a 271 rate exceeding 'max_rate'. 272 273 Parameters: 274 max_rate - the maximium allowed rate of disk consumption 275 during a test, in MB/hour, or 0 to indicate 276 no limit. 277 """ 278 self.set_state('__monitor_disk', max_rate) 279 self.max_disk_usage_rate = max_rate 280 281 282 def relative_path(self, path): 283 """\ 284 Return a patch relative to the job results directory 285 """ 286 head = len(self.resultdir) + 1 # remove the / inbetween 287 return path[head:] 288 289 290 def control_get(self): 291 return self.control 292 293 294 def control_set(self, control): 295 self.control = os.path.abspath(control) 296 297 298 def harness_select(self, which): 299 self.harness = harness.select(which, self) 300 301 302 def config_set(self, name, value): 303 self.config.set(name, value) 304 305 306 def config_get(self, name): 307 return self.config.get(name) 308 309 310 def setup_dirs(self, results_dir, tmp_dir): 311 if not tmp_dir: 312 tmp_dir = os.path.join(self.tmpdir, 'build') 313 if not os.path.exists(tmp_dir): 314 os.mkdir(tmp_dir) 315 if not os.path.isdir(tmp_dir): 316 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 317 raise ValueError(e_msg) 318 319 # We label the first build "build" and then subsequent ones 320 # as "build.2", "build.3", etc. Whilst this is a little bit 321 # inconsistent, 99.9% of jobs will only have one build 322 # (that's not done as kernbench, sparse, or buildtest), 323 # so it works out much cleaner. One of life's comprimises. 324 if not results_dir: 325 results_dir = os.path.join(self.resultdir, 'build') 326 i = 2 327 while os.path.exists(results_dir): 328 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 329 i += 1 330 if not os.path.exists(results_dir): 331 os.mkdir(results_dir) 332 333 return (results_dir, tmp_dir) 334 335 336 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 337 kjob = None ): 338 """Summon a xen object""" 339 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 340 build_dir = 'xen' 341 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 342 leave, kjob) 343 344 345 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 346 """Summon a kernel object""" 347 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 348 build_dir = 'linux' 349 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 350 build_dir, leave) 351 352 353 def barrier(self, *args, **kwds): 354 """Create a barrier object""" 355 return barrier.barrier(*args, **kwds) 356 357 358 def install_pkg(self, name, pkg_type, install_dir): 359 ''' 360 This method is a simple wrapper around the actual package 361 installation method in the Packager class. This is used 362 internally by the profilers, deps and tests code. 363 name : name of the package (ex: sleeptest, dbench etc.) 364 pkg_type : Type of the package (ex: test, dep etc.) 365 install_dir : The directory in which the source is actually 366 untarred into. (ex: client/profilers/<name> for profilers) 367 ''' 368 if self.pkgmgr.repositories: 369 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 370 371 372 def add_repository(self, repo_urls): 373 ''' 374 Adds the repository locations to the job so that packages 375 can be fetched from them when needed. The repository list 376 needs to be a string list 377 Ex: job.add_repository(['http://blah1','http://blah2']) 378 ''' 379 for repo_url in repo_urls: 380 self.pkgmgr.add_repository(repo_url) 381 382 # Fetch the packages' checksum file that contains the checksums 383 # of all the packages if it is not already fetched. The checksum 384 # is always fetched whenever a job is first started. This 385 # is not done in the job's constructor as we don't have the list of 386 # the repositories there (and obviously don't care about this file 387 # if we are not using the repos) 388 try: 389 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 390 base_packages.CHECKSUM_FILE) 391 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 392 checksum_file_path, use_checksum=False) 393 except error.PackageFetchError: 394 # packaging system might not be working in this case 395 # Silently fall back to the normal case 396 pass 397 398 399 def require_gcc(self): 400 """ 401 Test whether gcc is installed on the machine. 402 """ 403 # check if gcc is installed on the system. 404 try: 405 utils.system('which gcc') 406 except error.CmdError, e: 407 raise NotAvailableError('gcc is required by this job and is ' 408 'not available on the system') 409 410 411 def setup_dep(self, deps): 412 """Set up the dependencies for this test. 413 deps is a list of libraries required for this test. 414 """ 415 # Fetch the deps from the repositories and set them up. 416 for dep in deps: 417 dep_dir = os.path.join(self.autodir, 'deps', dep) 418 # Search for the dependency in the repositories if specified, 419 # else check locally. 420 try: 421 self.install_pkg(dep, 'dep', dep_dir) 422 except error.PackageInstallError: 423 # see if the dep is there locally 424 pass 425 426 # dep_dir might not exist if it is not fetched from the repos 427 if not os.path.exists(dep_dir): 428 raise error.TestError("Dependency %s does not exist" % dep) 429 430 os.chdir(dep_dir) 431 utils.system('./' + dep + '.py') 432 433 434 def _runtest(self, url, tag, args, dargs): 435 try: 436 l = lambda : test.runtest(self, url, tag, args, dargs) 437 pid = parallel.fork_start(self.resultdir, l) 438 parallel.fork_waitfor(self.resultdir, pid) 439 except error.TestBaseException: 440 # These are already classified with an error type (exit_status) 441 raise 442 except error.JobError: 443 raise # Caught further up and turned into an ABORT. 444 except Exception, e: 445 # Converts all other exceptions thrown by the test regardless 446 # of phase into a TestError(TestBaseException) subclass that 447 # reports them with their full stack trace. 448 raise error.UnhandledTestError(e) 449 450 451 @_run_test_complete_on_exit 452 def run_test(self, url, *args, **dargs): 453 """Summon a test object and run it. 454 455 tag 456 tag to add to testname 457 url 458 url of the test to run 459 """ 460 461 if not url: 462 raise TypeError("Test name is invalid. " 463 "Switched arguments?") 464 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 465 namelen = len(testname) 466 dargs = dargs.copy() 467 tntag = dargs.pop('tag', None) 468 if tntag: # per-test tag is included in reported test name 469 testname += '.' + str(tntag) 470 run_number = self.get_run_number() 471 if run_number: 472 testname += '._%02d_' % run_number 473 self.set_run_number(run_number + 1) 474 if self._is_kernel_in_test_tag(): 475 testname += '.' + platform.release() 476 if self._test_tag_prefix: 477 testname += '.' + self._test_tag_prefix 478 if self.testtag: # job-level tag is included in reported test name 479 testname += '.' + self.testtag 480 subdir = testname 481 sdtag = dargs.pop('subdir_tag', None) 482 if sdtag: # subdir-only tag is not included in reports 483 subdir = subdir + '.' + str(sdtag) 484 tag = subdir[namelen+1:] # '' if none 485 486 outputdir = os.path.join(self.resultdir, subdir) 487 if os.path.exists(outputdir): 488 msg = ("%s already exists, test <%s> may have" 489 " already run with tag <%s>" 490 % (outputdir, testname, tag) ) 491 raise error.TestError(msg) 492 # NOTE: client/common_lib/test.py runtest() depends directory names 493 # being constructed the same way as in this code. 494 os.mkdir(outputdir) 495 496 def log_warning(reason): 497 self.record("WARN", subdir, testname, reason) 498 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 499 def group_func(): 500 try: 501 self._runtest(url, tag, args, dargs) 502 except error.TestBaseException, detail: 503 # The error is already classified, record it properly. 504 self.record(detail.exit_status, subdir, testname, str(detail)) 505 raise 506 else: 507 self.record('GOOD', subdir, testname, 'completed successfully') 508 509 try: 510 self._rungroup(subdir, testname, group_func) 511 return True 512 except error.TestBaseException: 513 return False 514 # Any other exception here will be given to the caller 515 # 516 # NOTE: The only exception possible from the control file here 517 # is error.JobError as _runtest() turns all others into an 518 # UnhandledTestError that is caught above. 519 520 521 def _rungroup(self, subdir, testname, function, *args, **dargs): 522 """\ 523 subdir: 524 name of the group 525 testname: 526 name of the test to run, or support step 527 function: 528 subroutine to run 529 *args: 530 arguments for the function 531 532 Returns the result of the passed in function 533 """ 534 535 try: 536 self.record('START', subdir, testname) 537 self._increment_group_level() 538 result = function(*args, **dargs) 539 self._decrement_group_level() 540 self.record('END GOOD', subdir, testname) 541 return result 542 except error.TestBaseException, e: 543 self._decrement_group_level() 544 self.record('END %s' % e.exit_status, subdir, testname) 545 raise 546 except error.JobError, e: 547 self._decrement_group_level() 548 self.record('END ABORT', subdir, testname) 549 raise 550 except Exception, e: 551 # This should only ever happen due to a bug in the given 552 # function's code. The common case of being called by 553 # run_test() will never reach this. If a control file called 554 # run_group() itself, bugs in its function will be caught 555 # here. 556 self._decrement_group_level() 557 err_msg = str(e) + '\n' + traceback.format_exc() 558 self.record('END ERROR', subdir, testname, err_msg) 559 raise 560 561 562 def run_group(self, function, tag=None, **dargs): 563 """ 564 Run a function nested within a group level. 565 566 function: 567 Callable to run. 568 tag: 569 An optional tag name for the group. If None (default) 570 function.__name__ will be used. 571 **dargs: 572 Named arguments for the function. 573 """ 574 if tag: 575 name = tag 576 else: 577 name = function.__name__ 578 579 try: 580 return self._rungroup(subdir=None, testname=name, 581 function=function, **dargs) 582 except (SystemExit, error.TestBaseException): 583 raise 584 # If there was a different exception, turn it into a TestError. 585 # It will be caught by step_engine or _run_step_fn. 586 except Exception, e: 587 raise error.UnhandledTestError(e) 588 589 590 _RUN_NUMBER_STATE = '__run_number' 591 def get_run_number(self): 592 """Get the run number or 0 if no run number has been set.""" 593 return self.get_state(self._RUN_NUMBER_STATE, default=0) 594 595 596 def set_run_number(self, number): 597 """If the run number is non-zero it will be in the output dir name.""" 598 self.set_state(self._RUN_NUMBER_STATE, number) 599 600 601 _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' 602 def _is_kernel_in_test_tag(self): 603 """Boolean: should the kernel version be included in the test tag.""" 604 return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) 605 606 607 def show_kernel_in_test_tag(self, value=True): 608 """If True, the kernel version at test time will prefix test tags.""" 609 self.set_state(self._KERNEL_IN_TAG_STATE, value) 610 611 612 def set_test_tag(self, tag=''): 613 """Set tag to be added to test name of all following run_test steps.""" 614 self.testtag = tag 615 616 617 def set_test_tag_prefix(self, prefix): 618 """Set a prefix string to prepend to all future run_test steps. 619 620 Args: 621 prefix: A string to prepend to any run_test() step tags separated by 622 a '.'; use the empty string '' to clear it. 623 """ 624 self._test_tag_prefix = prefix 625 626 627 def cpu_count(self): 628 return utils.count_cpus() # use total system count 629 630 631 def start_reboot(self): 632 self.record('START', None, 'reboot') 633 self._increment_group_level() 634 self.record('GOOD', None, 'reboot.start') 635 636 637 def _record_reboot_failure(self, subdir, operation, status, 638 running_id=None): 639 self.record("ABORT", subdir, operation, status) 640 self._decrement_group_level() 641 if not running_id: 642 running_id = utils.running_os_ident() 643 kernel = {"kernel": running_id.split("::")[0]} 644 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 645 646 647 def _check_post_reboot(self, subdir, running_id=None): 648 """ 649 Function to perform post boot checks such as if the mounted 650 partition list has changed across reboots. 651 """ 652 partition_list = partition_lib.get_partition_list(self) 653 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 654 old_mount_info = self.get_state("__mount_info") 655 if mount_info != old_mount_info: 656 new_entries = mount_info - old_mount_info 657 old_entries = old_mount_info - mount_info 658 description = ("mounted partitions are different after reboot " 659 "(old entries: %s, new entries: %s)" % 660 (old_entries, new_entries)) 661 self._record_reboot_failure(subdir, "reboot.verify_config", 662 description, running_id=running_id) 663 raise error.JobError("Reboot failed: %s" % description) 664 665 666 def end_reboot(self, subdir, kernel, patches, running_id=None): 667 self._check_post_reboot(subdir, running_id=running_id) 668 669 # strip ::<timestamp> from the kernel version if present 670 kernel = kernel.split("::")[0] 671 kernel_info = {"kernel": kernel} 672 for i, patch in enumerate(patches): 673 kernel_info["patch%d" % i] = patch 674 self._decrement_group_level() 675 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 676 677 678 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 679 type='src', patches=[]): 680 """ Check the passed kernel identifier against the command line 681 and the running kernel, abort the job on missmatch. """ 682 683 logging.info("POST BOOT: checking booted kernel " 684 "mark=%d identity='%s' type='%s'", 685 expected_when, expected_id, type) 686 687 running_id = utils.running_os_ident() 688 689 cmdline = utils.read_one_line("/proc/cmdline") 690 691 find_sum = re.compile(r'.*IDENT=(\d+)') 692 m = find_sum.match(cmdline) 693 cmdline_when = -1 694 if m: 695 cmdline_when = int(m.groups()[0]) 696 697 # We have all the facts, see if they indicate we 698 # booted the requested kernel or not. 699 bad = False 700 if (type == 'src' and expected_id != running_id or 701 type == 'rpm' and 702 not running_id.startswith(expected_id + '::')): 703 logging.error("Kernel identifier mismatch") 704 bad = True 705 if expected_when != cmdline_when: 706 logging.error("Kernel command line mismatch") 707 bad = True 708 709 if bad: 710 logging.error(" Expected Ident: " + expected_id) 711 logging.error(" Running Ident: " + running_id) 712 logging.error(" Expected Mark: %d", expected_when) 713 logging.error("Command Line Mark: %d", cmdline_when) 714 logging.error(" Command Line: " + cmdline) 715 716 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 717 running_id=running_id) 718 raise error.JobError("Reboot returned with the wrong kernel") 719 720 self.record('GOOD', subdir, 'reboot.verify', 721 utils.running_os_full_version()) 722 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 723 724 725 def partition(self, device, loop_size=0, mountpoint=None): 726 """ 727 Work with a machine partition 728 729 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 730 @param mountpoint: Specify a directory to mount to. If not specified 731 autotest tmp directory will be used. 732 @param loop_size: Size of loopback device (in MB). Defaults to 0. 733 734 @return: A L{client.bin.partition.partition} object 735 """ 736 737 if not mountpoint: 738 mountpoint = self.tmpdir 739 return partition_lib.partition(self, device, loop_size, mountpoint) 740 741 @utils.deprecated 742 def filesystem(self, device, mountpoint=None, loop_size=0): 743 """ Same as partition 744 745 @deprecated: Use partition method instead 746 """ 747 return self.partition(device, loop_size, mountpoint) 748 749 750 def enable_external_logging(self): 751 pass 752 753 754 def disable_external_logging(self): 755 pass 756 757 758 def enable_test_cleanup(self): 759 """ By default tests run test.cleanup """ 760 self.set_state("__run_test_cleanup", True) 761 self.run_test_cleanup = True 762 763 764 def disable_test_cleanup(self): 765 """ By default tests do not run test.cleanup """ 766 self.set_state("__run_test_cleanup", False) 767 self.run_test_cleanup = False 768 769 770 def default_test_cleanup(self, val): 771 if not self._is_continuation: 772 self.set_state("__run_test_cleanup", val) 773 self.run_test_cleanup = val 774 775 776 def default_boot_tag(self, tag): 777 if not self._is_continuation: 778 self.set_state("__last_boot_tag", tag) 779 self.last_boot_tag = tag 780 781 782 def default_tag(self, tag): 783 """Allows the scheduler's job tag to be passed in from autoserv.""" 784 if not self._is_continuation: 785 self.set_state("__job_tag", tag) 786 self.tag = tag 787 788 789 def reboot_setup(self): 790 # save the partition list and their mount point and compare it 791 # after reboot 792 partition_list = partition_lib.get_partition_list(self) 793 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 794 self.set_state("__mount_info", mount_info) 795 796 797 def reboot(self, tag=LAST_BOOT_TAG): 798 if tag == LAST_BOOT_TAG: 799 tag = self.last_boot_tag 800 else: 801 self.set_state("__last_boot_tag", tag) 802 self.last_boot_tag = tag 803 804 self.reboot_setup() 805 self.harness.run_reboot() 806 default = self.config_get('boot.set_default') 807 if default: 808 self.bootloader.set_default(tag) 809 else: 810 self.bootloader.boot_once(tag) 811 812 # HACK: using this as a module sometimes hangs shutdown, so if it's 813 # installed unload it first 814 utils.system("modprobe -r netconsole", ignore_status=True) 815 816 # sync first, so that a sync during shutdown doesn't time out 817 utils.system("sync; sync", ignore_status=True) 818 819 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 820 self.quit() 821 822 823 def noop(self, text): 824 logging.info("job: noop: " + text) 825 826 827 @_run_test_complete_on_exit 828 def parallel(self, *tasklist): 829 """Run tasks in parallel""" 830 831 pids = [] 832 old_log_filename = self.log_filename 833 for i, task in enumerate(tasklist): 834 assert isinstance(task, (tuple, list)) 835 self.log_filename = old_log_filename + (".%d" % i) 836 task_func = lambda: task[0](*task[1:]) 837 pids.append(parallel.fork_start(self.resultdir, task_func)) 838 839 old_log_path = os.path.join(self.resultdir, old_log_filename) 840 old_log = open(old_log_path, "a") 841 exceptions = [] 842 for i, pid in enumerate(pids): 843 # wait for the task to finish 844 try: 845 parallel.fork_waitfor(self.resultdir, pid) 846 except Exception, e: 847 exceptions.append(e) 848 # copy the logs from the subtask into the main log 849 new_log_path = old_log_path + (".%d" % i) 850 if os.path.exists(new_log_path): 851 new_log = open(new_log_path) 852 old_log.write(new_log.read()) 853 new_log.close() 854 old_log.flush() 855 os.remove(new_log_path) 856 old_log.close() 857 858 self.log_filename = old_log_filename 859 860 # handle any exceptions raised by the parallel tasks 861 if exceptions: 862 msg = "%d task(s) failed in job.parallel" % len(exceptions) 863 raise error.JobError(msg) 864 865 866 def quit(self): 867 # XXX: should have a better name. 868 self.harness.run_pause() 869 raise error.JobContinue("more to come") 870 871 872 def complete(self, status): 873 """Clean up and exit""" 874 # We are about to exit 'complete' so clean up the control file. 875 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 876 os.rename(self.state_file, dest) 877 878 self.harness.run_complete() 879 self.disable_external_logging() 880 sys.exit(status) 881 882 883 def set_state(self, var, val): 884 # Deep copies make sure that the state can't be altered 885 # without it being re-written. Perf wise, deep copies 886 # are overshadowed by pickling/loading. 887 self.state[var] = copy.deepcopy(val) 888 outfile = open(self.state_file, 'wb') 889 try: 890 pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) 891 finally: 892 outfile.close() 893 logging.debug("Persistent state variable %s now set to %r", var, val) 894 895 896 def _load_state(self): 897 if hasattr(self, 'state'): 898 raise RuntimeError('state already exists') 899 infile = None 900 try: 901 try: 902 infile = open(self.state_file, 'rb') 903 self.state = pickle.load(infile) 904 logging.info('Loaded state from %s', self.state_file) 905 except IOError: 906 self.state = {} 907 initialize = True 908 finally: 909 if infile: 910 infile.close() 911 912 initialize = '__steps' not in self.state 913 if not (self._is_continuation or initialize): 914 raise RuntimeError('Loaded state must contain __steps or be a ' 915 'continuation.') 916 917 if initialize: 918 logging.info('Initializing the state engine') 919 self.set_state('__steps', []) # writes pickle file 920 921 922 def get_state(self, var, default=NO_DEFAULT): 923 if var in self.state or default == NO_DEFAULT: 924 val = self.state[var] 925 else: 926 val = default 927 return copy.deepcopy(val) 928 929 930 def __create_step_tuple(self, fn, args, dargs): 931 # Legacy code passes in an array where the first arg is 932 # the function or its name. 933 if isinstance(fn, list): 934 assert(len(args) == 0) 935 assert(len(dargs) == 0) 936 args = fn[1:] 937 fn = fn[0] 938 # Pickling actual functions is hairy, thus we have to call 939 # them by name. Unfortunately, this means only functions 940 # defined globally can be used as a next step. 941 if callable(fn): 942 fn = fn.__name__ 943 if not isinstance(fn, types.StringTypes): 944 raise StepError("Next steps must be functions or " 945 "strings containing the function name") 946 ancestry = copy.copy(self.current_step_ancestry) 947 return (ancestry, fn, args, dargs) 948 949 950 def next_step_append(self, fn, *args, **dargs): 951 """Define the next step and place it at the end""" 952 steps = self.get_state('__steps') 953 steps.append(self.__create_step_tuple(fn, args, dargs)) 954 self.set_state('__steps', steps) 955 956 957 def next_step(self, fn, *args, **dargs): 958 """Create a new step and place it after any steps added 959 while running the current step but before any steps added in 960 previous steps""" 961 steps = self.get_state('__steps') 962 steps.insert(self.next_step_index, 963 self.__create_step_tuple(fn, args, dargs)) 964 self.next_step_index += 1 965 self.set_state('__steps', steps) 966 967 968 def next_step_prepend(self, fn, *args, **dargs): 969 """Insert a new step, executing first""" 970 steps = self.get_state('__steps') 971 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 972 self.next_step_index += 1 973 self.set_state('__steps', steps) 974 975 976 def _run_step_fn(self, local_vars, fn, args, dargs): 977 """Run a (step) function within the given context""" 978 979 local_vars['__args'] = args 980 local_vars['__dargs'] = dargs 981 try: 982 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 983 return local_vars['__ret'] 984 except SystemExit: 985 raise # Send error.JobContinue and JobComplete on up to runjob. 986 except error.TestNAError, detail: 987 self.record(detail.exit_status, None, fn, str(detail)) 988 except Exception, detail: 989 raise error.UnhandledJobError(detail) 990 991 992 def _create_frame(self, global_vars, ancestry, fn_name): 993 """Set up the environment like it would have been when this 994 function was first defined. 995 996 Child step engine 'implementations' must have 'return locals()' 997 at end end of their steps. Because of this, we can call the 998 parent function and get back all child functions (i.e. those 999 defined within it). 1000 1001 Unfortunately, the call stack of the function calling 1002 job.next_step might have been deeper than the function it 1003 added. In order to make sure that the environment is what it 1004 should be, we need to then pop off the frames we built until 1005 we find the frame where the function was first defined.""" 1006 1007 # The copies ensure that the parent frames are not modified 1008 # while building child frames. This matters if we then 1009 # pop some frames in the next part of this function. 1010 current_frame = copy.copy(global_vars) 1011 frames = [current_frame] 1012 for steps_fn_name in ancestry: 1013 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1014 current_frame = copy.copy(ret) 1015 frames.append(current_frame) 1016 1017 # Walk up the stack frames until we find the place fn_name was defined. 1018 while len(frames) > 2: 1019 if fn_name not in frames[-2]: 1020 break 1021 if frames[-2][fn_name] != frames[-1][fn_name]: 1022 break 1023 frames.pop() 1024 ancestry.pop() 1025 1026 return (frames[-1], ancestry) 1027 1028 1029 def _add_step_init(self, local_vars, current_function): 1030 """If the function returned a dictionary that includes a 1031 function named 'step_init', prepend it to our list of steps. 1032 This will only get run the first time a function with a nested 1033 use of the step engine is run.""" 1034 1035 if (isinstance(local_vars, dict) and 1036 'step_init' in local_vars and 1037 callable(local_vars['step_init'])): 1038 # The init step is a child of the function 1039 # we were just running. 1040 self.current_step_ancestry.append(current_function) 1041 self.next_step_prepend('step_init') 1042 1043 1044 def step_engine(self): 1045 """The multi-run engine used when the control file defines step_init. 1046 1047 Does the next step. 1048 """ 1049 1050 # Set up the environment and then interpret the control file. 1051 # Some control files will have code outside of functions, 1052 # which means we need to have our state engine initialized 1053 # before reading in the file. 1054 global_control_vars = {'job': self} 1055 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1056 try: 1057 execfile(self.control, global_control_vars, global_control_vars) 1058 except error.TestNAError, detail: 1059 self.record(detail.exit_status, None, self.control, str(detail)) 1060 except SystemExit: 1061 raise # Send error.JobContinue and JobComplete on up to runjob. 1062 except Exception, detail: 1063 # Syntax errors or other general Python exceptions coming out of 1064 # the top level of the control file itself go through here. 1065 raise error.UnhandledJobError(detail) 1066 1067 # If we loaded in a mid-job state file, then we presumably 1068 # know what steps we have yet to run. 1069 if not self._is_continuation: 1070 if 'step_init' in global_control_vars: 1071 self.next_step(global_control_vars['step_init']) 1072 1073 # Iterate through the steps. If we reboot, we'll simply 1074 # continue iterating on the next step. 1075 while len(self.get_state('__steps')) > 0: 1076 steps = self.get_state('__steps') 1077 (ancestry, fn_name, args, dargs) = steps.pop(0) 1078 self.set_state('__steps', steps) 1079 1080 self.next_step_index = 0 1081 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1082 local_vars, self.current_step_ancestry = ret 1083 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1084 self._add_step_init(local_vars, fn_name) 1085 1086 1087 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1088 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1089 on_every_test) 1090 1091 1092 def add_sysinfo_logfile(self, file, on_every_test=False): 1093 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1094 1095 1096 def _add_sysinfo_loggable(self, loggable, on_every_test): 1097 if on_every_test: 1098 self.sysinfo.test_loggables.add(loggable) 1099 else: 1100 self.sysinfo.boot_loggables.add(loggable) 1101 self._save_sysinfo_state() 1102 1103 1104 def _load_sysinfo_state(self): 1105 state = self.get_state("__sysinfo", None) 1106 if state: 1107 self.sysinfo.deserialize(state) 1108 1109 1110 def _save_sysinfo_state(self): 1111 state = self.sysinfo.serialize() 1112 self.set_state("__sysinfo", state) 1113 1114 1115 def _init_group_level(self): 1116 self.group_level = self.get_state("__group_level", default=0) 1117 1118 1119 def _increment_group_level(self): 1120 self.group_level += 1 1121 self.set_state("__group_level", self.group_level) 1122 1123 1124 def _decrement_group_level(self): 1125 self.group_level -= 1 1126 self.set_state("__group_level", self.group_level) 1127 1128 1129 def record(self, status_code, subdir, operation, status = '', 1130 optional_fields=None): 1131 """ 1132 Record job-level status 1133 1134 The intent is to make this file both machine parseable and 1135 human readable. That involves a little more complexity, but 1136 really isn't all that bad ;-) 1137 1138 Format is <status code>\t<subdir>\t<operation>\t<status> 1139 1140 status code: (GOOD|WARN|FAIL|ABORT) 1141 or START 1142 or END (GOOD|WARN|FAIL|ABORT) 1143 1144 subdir: MUST be a relevant subdirectory in the results, 1145 or None, which will be represented as '----' 1146 1147 operation: description of what you ran (e.g. "dbench", or 1148 "mkfs -t foobar /dev/sda9") 1149 1150 status: error message or "completed sucessfully" 1151 1152 ------------------------------------------------------------ 1153 1154 Initial tabs indicate indent levels for grouping, and is 1155 governed by self.group_level 1156 1157 multiline messages have secondary lines prefaced by a double 1158 space (' ') 1159 """ 1160 1161 if subdir: 1162 if re.match(r'[\n\t]', subdir): 1163 raise ValueError("Invalid character in subdir string") 1164 substr = subdir 1165 else: 1166 substr = '----' 1167 1168 if not log.is_valid_status(status_code): 1169 raise ValueError("Invalid status code supplied: %s" % status_code) 1170 if not operation: 1171 operation = '----' 1172 1173 if re.match(r'[\n\t]', operation): 1174 raise ValueError("Invalid character in operation string") 1175 operation = operation.rstrip() 1176 1177 if not optional_fields: 1178 optional_fields = {} 1179 1180 status = status.rstrip() 1181 status = re.sub(r"\t", " ", status) 1182 # Ensure any continuation lines are marked so we can 1183 # detect them in the status file to ensure it is parsable. 1184 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 1185 1186 # Generate timestamps for inclusion in the logs 1187 epoch_time = int(time.time()) # seconds since epoch, in UTC 1188 local_time = time.localtime(epoch_time) 1189 optional_fields["timestamp"] = str(epoch_time) 1190 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1191 local_time) 1192 1193 fields = [status_code, substr, operation] 1194 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1195 fields.append(status) 1196 1197 msg = '\t'.join(str(x) for x in fields) 1198 msg = '\t' * self.group_level + msg 1199 1200 msg_tag = "" 1201 if "." in self.log_filename: 1202 msg_tag = self.log_filename.split(".", 1)[1] 1203 1204 self.harness.test_status_detail(status_code, substr, operation, status, 1205 msg_tag) 1206 self.harness.test_status(msg, msg_tag) 1207 1208 # log to stdout (if enabled) 1209 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 1210 logging.info(msg) 1211 1212 # log to the "root" status log 1213 status_file = os.path.join(self.resultdir, self.log_filename) 1214 open(status_file, "a").write(msg + "\n") 1215 1216 # log to the subdir status log (if subdir is set) 1217 if subdir: 1218 dir = os.path.join(self.resultdir, subdir) 1219 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1220 open(status_file, "a").write(msg + "\n") 1221 1222 1223class disk_usage_monitor: 1224 def __init__(self, logging_func, device, max_mb_per_hour): 1225 self.func = logging_func 1226 self.device = device 1227 self.max_mb_per_hour = max_mb_per_hour 1228 1229 1230 def start(self): 1231 self.initial_space = utils.freespace(self.device) 1232 self.start_time = time.time() 1233 1234 1235 def stop(self): 1236 # if no maximum usage rate was set, we don't need to 1237 # generate any warnings 1238 if not self.max_mb_per_hour: 1239 return 1240 1241 final_space = utils.freespace(self.device) 1242 used_space = self.initial_space - final_space 1243 stop_time = time.time() 1244 total_time = stop_time - self.start_time 1245 # round up the time to one minute, to keep extremely short 1246 # tests from generating false positives due to short, badly 1247 # timed bursts of activity 1248 total_time = max(total_time, 60.0) 1249 1250 # determine the usage rate 1251 bytes_per_sec = used_space / total_time 1252 mb_per_sec = bytes_per_sec / 1024**2 1253 mb_per_hour = mb_per_sec * 60 * 60 1254 1255 if mb_per_hour > self.max_mb_per_hour: 1256 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1257 msg %= (self.device, mb_per_hour) 1258 self.func(msg) 1259 1260 1261 @classmethod 1262 def watch(cls, *monitor_args, **monitor_dargs): 1263 """ Generic decorator to wrap a function call with the 1264 standard create-monitor -> start -> call -> stop idiom.""" 1265 def decorator(func): 1266 def watched_func(*args, **dargs): 1267 monitor = cls(*monitor_args, **monitor_dargs) 1268 monitor.start() 1269 try: 1270 func(*args, **dargs) 1271 finally: 1272 monitor.stop() 1273 return watched_func 1274 return decorator 1275 1276 1277def runjob(control, options): 1278 """ 1279 Run a job using the given control file. 1280 1281 This is the main interface to this module. 1282 1283 @see base_job.__init__ for parameter info. 1284 """ 1285 control = os.path.abspath(control) 1286 state = control + '.state' 1287 # Ensure state file is cleaned up before the job starts to run if autotest 1288 # is not running with the --continue flag 1289 if not options.cont and os.path.isfile(state): 1290 logging.debug('Cleaning up previously found state file') 1291 os.remove(state) 1292 1293 # instantiate the job object ready for the control file. 1294 myjob = None 1295 try: 1296 # Check that the control file is valid 1297 if not os.path.exists(control): 1298 raise error.JobError(control + ": control file not found") 1299 1300 # When continuing, the job is complete when there is no 1301 # state file, ensure we don't try and continue. 1302 if options.cont and not os.path.exists(state): 1303 raise error.JobComplete("all done") 1304 1305 myjob = job(control, options) 1306 1307 # Load in the users control file, may do any one of: 1308 # 1) execute in toto 1309 # 2) define steps, and select the first via next_step() 1310 myjob.step_engine() 1311 1312 except error.JobContinue: 1313 sys.exit(5) 1314 1315 except error.JobComplete: 1316 sys.exit(1) 1317 1318 except error.JobError, instance: 1319 logging.error("JOB ERROR: " + instance.args[0]) 1320 if myjob: 1321 command = None 1322 if len(instance.args) > 1: 1323 command = instance.args[1] 1324 myjob.record('ABORT', None, command, instance.args[0]) 1325 myjob._decrement_group_level() 1326 myjob.record('END ABORT', None, None, instance.args[0]) 1327 assert (myjob.group_level == 0), ('myjob.group_level must be 0,' 1328 ' not %d' % myjob.group_level) 1329 myjob.complete(1) 1330 else: 1331 sys.exit(1) 1332 1333 except Exception, e: 1334 # NOTE: job._run_step_fn and job.step_engine will turn things into 1335 # a JobError for us. If we get here, its likely an autotest bug. 1336 msg = str(e) + '\n' + traceback.format_exc() 1337 logging.critical("JOB ERROR (autotest bug?): " + msg) 1338 if myjob: 1339 myjob._decrement_group_level() 1340 myjob.record('END ABORT', None, None, msg) 1341 assert(myjob.group_level == 0) 1342 myjob.complete(1) 1343 else: 1344 sys.exit(1) 1345 1346 # If we get here, then we assume the job is complete and good. 1347 myjob._decrement_group_level() 1348 myjob.record('END GOOD', None, None) 1349 assert(myjob.group_level == 0) 1350 1351 myjob.complete(0) 1352 1353 1354site_job = utils.import_site_class( 1355 __file__, "autotest_lib.client.bin.site_job", "site_job", base_job) 1356 1357class job(site_job): 1358 pass 1359