job.py revision fc3da5bbdcd63840cd224282812c050c5520db11
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types, glob 9import logging, getpass, errno 10import cPickle as pickle 11from autotest_lib.client.bin import client_logging_config 12from autotest_lib.client.bin import utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, test, local_host 15from autotest_lib.client.bin import partition as partition_lib 16from autotest_lib.client.common_lib import base_job 17from autotest_lib.client.common_lib import error, barrier, log, logging_manager 18from autotest_lib.client.common_lib import base_packages, packages 19from autotest_lib.client.common_lib import global_config 20 21 22LAST_BOOT_TAG = object() 23JOB_PREAMBLE = """ 24from autotest_lib.client.common_lib.error import * 25from autotest_lib.client.bin.utils import * 26""" 27 28 29class StepError(error.AutotestError): 30 pass 31 32class NotAvailableError(error.AutotestError): 33 pass 34 35 36 37def _run_test_complete_on_exit(f): 38 """Decorator for job methods that automatically calls 39 self.harness.run_test_complete when the method exits, if appropriate.""" 40 def wrapped(self, *args, **dargs): 41 try: 42 return f(self, *args, **dargs) 43 finally: 44 if self._log_filename == self._DEFAULT_LOG_FILENAME: 45 self.harness.run_test_complete() 46 if self.drop_caches: 47 logging.debug("Dropping caches") 48 utils.drop_caches() 49 wrapped.__name__ = f.__name__ 50 wrapped.__doc__ = f.__doc__ 51 wrapped.__dict__.update(f.__dict__) 52 return wrapped 53 54 55class base_client_job(base_job.base_job): 56 """The client-side concrete implementation of base_job. 57 58 Optional properties provided by this implementation: 59 control 60 bootloader 61 harness 62 """ 63 64 _DEFAULT_LOG_FILENAME = "status" 65 _WARNING_DISABLE_DELAY = 5 66 67 68 def __init__(self, control, options, drop_caches=True, 69 extra_copy_cmdline=None): 70 """ 71 Prepare a client side job object. 72 73 @param control: The control file (pathname of). 74 @param options: an object which includes: 75 jobtag: The job tag string (eg "default"). 76 cont: If this is the continuation of this job. 77 harness_type: An alternative server harness. [None] 78 use_external_logging: If true, the enable_external_logging 79 method will be called during construction. [False] 80 @param drop_caches: If true, utils.drop_caches() is called before and 81 between all tests. [True] 82 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 83 copy from the running kernel to all the installed kernels with 84 this job 85 """ 86 super(base_client_job, self).__init__(options=options) 87 self._pre_record_init(control, options) 88 try: 89 self._post_record_init(control, options, drop_caches, 90 extra_copy_cmdline) 91 except Exception, err: 92 self.record( 93 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 94 str(err)) 95 raise 96 97 98 @classmethod 99 def _get_environ_autodir(cls): 100 return os.environ['AUTODIR'] 101 102 103 @classmethod 104 def _find_base_directories(cls): 105 """ 106 Determine locations of autodir and clientdir (which are the same) 107 using os.environ. Serverdir does not exist in this context. 108 """ 109 autodir = clientdir = cls._get_environ_autodir() 110 return autodir, clientdir, None 111 112 113 def _find_resultdir(self, options): 114 """ 115 Determine the directory for storing results. On a client this is 116 always <autodir>/results/<tag>, where tag is passed in on the command 117 line as an option. 118 """ 119 return os.path.join(self.autodir, 'results', options.tag) 120 121 122 def _pre_record_init(self, control, options): 123 """ 124 Initialization function that should peform ONLY the required 125 setup so that the self.record() method works. 126 127 As of now self.record() needs self.resultdir, self._group_level, 128 self._log_filename, self.harness. 129 """ 130 if not options.cont: 131 self._cleanup_results_dir() 132 133 logging_manager.configure_logging( 134 client_logging_config.ClientLoggingConfig(), 135 results_dir=self.resultdir, 136 verbose=options.verbose) 137 logging.info('Writing results to %s', self.resultdir) 138 139 self._log_filename = self._DEFAULT_LOG_FILENAME 140 141 # init_group_level needs the state 142 self.control = os.path.realpath(control) 143 self._is_continuation = options.cont 144 self._current_step_ancestry = [] 145 self._next_step_index = 0 146 self._load_state() 147 148 self._init_group_level() 149 150 self.harness = harness.select(options.harness, self) 151 152 153 def _post_record_init(self, control, options, drop_caches, 154 extra_copy_cmdline): 155 """ 156 Perform job initialization not required by self.record(). 157 """ 158 self._init_drop_caches(drop_caches) 159 160 self._init_packages() 161 162 self.sysinfo = sysinfo.sysinfo(self.resultdir) 163 self._load_sysinfo_state() 164 165 self.tag = self.get_state("__job_tag", default=None) 166 167 if not options.cont: 168 download = os.path.join(self.testdir, 'download') 169 if not os.path.exists(download): 170 os.mkdir(download) 171 172 os.makedirs(os.path.join(self.resultdir, 'analysis')) 173 174 shutil.copyfile(self.control, 175 os.path.join(self.resultdir, 'control')) 176 177 self.control = control 178 179 self.logging = logging_manager.get_logging_manager( 180 manage_stdout_and_stderr=True, redirect_fds=True) 181 self.logging.start_logging() 182 183 self._config = config.config(self) 184 self.profilers = profilers.profilers(self) 185 186 self._init_bootloader() 187 188 self.machines = [options.hostname] 189 self.hosts = set([local_host.LocalHost(hostname=options.hostname, 190 bootloader=self.bootloader)]) 191 192 if options.user: 193 self.user = options.user 194 else: 195 self.user = getpass.getuser() 196 197 self.sysinfo.log_per_reboot_data() 198 199 if not options.cont: 200 self.record('START', None, None) 201 self._increment_group_level() 202 203 self.harness.run_start() 204 205 if options.log: 206 self.enable_external_logging() 207 208 # load the max disk usage rate - default to no monitoring 209 self._max_disk_usage_rate = self.get_state('__monitor_disk', 210 default=0.0) 211 212 self._init_cmdline(extra_copy_cmdline) 213 214 self.num_tests_run = None 215 self.num_tests_failed = None 216 217 self.warning_loggers = None 218 self.warning_manager = None 219 220 221 def _init_drop_caches(self, drop_caches): 222 """ 223 Perform the drop caches initialization. 224 """ 225 self.drop_caches_between_iterations = ( 226 global_config.global_config.get_config_value('CLIENT', 227 'drop_caches_between_iterations', 228 type=bool, default=True)) 229 self.drop_caches = drop_caches 230 if self.drop_caches: 231 logging.debug("Dropping caches") 232 utils.drop_caches() 233 234 235 def _init_bootloader(self): 236 """ 237 Perform boottool initialization. 238 """ 239 tool = self.config_get('boottool.executable') 240 self.bootloader = boottool.boottool(tool) 241 242 243 def _init_packages(self): 244 """ 245 Perform the packages support initialization. 246 """ 247 self.pkgmgr = packages.PackageManager( 248 self.autodir, run_function_dargs={'timeout':3600}) 249 250 251 def _init_cmdline(self, extra_copy_cmdline): 252 """ 253 Initialize default cmdline for booted kernels in this job. 254 """ 255 copy_cmdline = set(['console']) 256 if extra_copy_cmdline is not None: 257 copy_cmdline.update(extra_copy_cmdline) 258 259 # extract console= and other args from cmdline and add them into the 260 # base args that we use for all kernels we install 261 cmdline = utils.read_one_line('/proc/cmdline') 262 kernel_args = [] 263 for karg in cmdline.split(): 264 for param in copy_cmdline: 265 if karg.startswith(param) and \ 266 (len(param) == len(karg) or karg[len(param)] == '='): 267 kernel_args.append(karg) 268 self.config_set('boot.default_args', ' '.join(kernel_args)) 269 270 271 def _cleanup_results_dir(self): 272 """Delete everything in resultsdir""" 273 assert os.path.exists(self.resultdir) 274 list_files = glob.glob('%s/*' % self.resultdir) 275 for f in list_files: 276 if os.path.isdir(f): 277 shutil.rmtree(f) 278 elif os.path.isfile(f): 279 os.remove(f) 280 281 282 def disable_warnings(self, warning_type): 283 self.record("INFO", None, None, 284 "disabling %s warnings" % warning_type, 285 {"warnings.disable": warning_type}) 286 time.sleep(self._WARNING_DISABLE_DELAY) 287 288 289 def enable_warnings(self, warning_type): 290 time.sleep(self._WARNING_DISABLE_DELAY) 291 self.record("INFO", None, None, 292 "enabling %s warnings" % warning_type, 293 {"warnings.enable": warning_type}) 294 295 296 def monitor_disk_usage(self, max_rate): 297 """\ 298 Signal that the job should monitor disk space usage on / 299 and generate a warning if a test uses up disk space at a 300 rate exceeding 'max_rate'. 301 302 Parameters: 303 max_rate - the maximium allowed rate of disk consumption 304 during a test, in MB/hour, or 0 to indicate 305 no limit. 306 """ 307 self.set_state('__monitor_disk', max_rate) 308 self._max_disk_usage_rate = max_rate 309 310 311 def relative_path(self, path): 312 """\ 313 Return a patch relative to the job results directory 314 """ 315 head = len(self.resultdir) + 1 # remove the / inbetween 316 return path[head:] 317 318 319 def control_get(self): 320 return self.control 321 322 323 def control_set(self, control): 324 self.control = os.path.abspath(control) 325 326 327 def harness_select(self, which): 328 self.harness = harness.select(which, self) 329 330 331 def config_set(self, name, value): 332 self._config.set(name, value) 333 334 335 def config_get(self, name): 336 return self._config.get(name) 337 338 339 def setup_dirs(self, results_dir, tmp_dir): 340 if not tmp_dir: 341 tmp_dir = os.path.join(self.tmpdir, 'build') 342 if not os.path.exists(tmp_dir): 343 os.mkdir(tmp_dir) 344 if not os.path.isdir(tmp_dir): 345 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 346 raise ValueError(e_msg) 347 348 # We label the first build "build" and then subsequent ones 349 # as "build.2", "build.3", etc. Whilst this is a little bit 350 # inconsistent, 99.9% of jobs will only have one build 351 # (that's not done as kernbench, sparse, or buildtest), 352 # so it works out much cleaner. One of life's comprimises. 353 if not results_dir: 354 results_dir = os.path.join(self.resultdir, 'build') 355 i = 2 356 while os.path.exists(results_dir): 357 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 358 i += 1 359 if not os.path.exists(results_dir): 360 os.mkdir(results_dir) 361 362 return (results_dir, tmp_dir) 363 364 365 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 366 kjob = None ): 367 """Summon a xen object""" 368 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 369 build_dir = 'xen' 370 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 371 leave, kjob) 372 373 374 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 375 """Summon a kernel object""" 376 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 377 build_dir = 'linux' 378 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 379 build_dir, leave) 380 381 382 def barrier(self, *args, **kwds): 383 """Create a barrier object""" 384 return barrier.barrier(*args, **kwds) 385 386 387 def install_pkg(self, name, pkg_type, install_dir): 388 ''' 389 This method is a simple wrapper around the actual package 390 installation method in the Packager class. This is used 391 internally by the profilers, deps and tests code. 392 name : name of the package (ex: sleeptest, dbench etc.) 393 pkg_type : Type of the package (ex: test, dep etc.) 394 install_dir : The directory in which the source is actually 395 untarred into. (ex: client/profilers/<name> for profilers) 396 ''' 397 if self.pkgmgr.repositories: 398 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 399 400 401 def add_repository(self, repo_urls): 402 ''' 403 Adds the repository locations to the job so that packages 404 can be fetched from them when needed. The repository list 405 needs to be a string list 406 Ex: job.add_repository(['http://blah1','http://blah2']) 407 ''' 408 for repo_url in repo_urls: 409 self.pkgmgr.add_repository(repo_url) 410 411 # Fetch the packages' checksum file that contains the checksums 412 # of all the packages if it is not already fetched. The checksum 413 # is always fetched whenever a job is first started. This 414 # is not done in the job's constructor as we don't have the list of 415 # the repositories there (and obviously don't care about this file 416 # if we are not using the repos) 417 try: 418 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 419 base_packages.CHECKSUM_FILE) 420 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 421 checksum_file_path, use_checksum=False) 422 except error.PackageFetchError: 423 # packaging system might not be working in this case 424 # Silently fall back to the normal case 425 pass 426 427 428 def require_gcc(self): 429 """ 430 Test whether gcc is installed on the machine. 431 """ 432 # check if gcc is installed on the system. 433 try: 434 utils.system('which gcc') 435 except error.CmdError, e: 436 raise NotAvailableError('gcc is required by this job and is ' 437 'not available on the system') 438 439 440 def setup_dep(self, deps): 441 """Set up the dependencies for this test. 442 deps is a list of libraries required for this test. 443 """ 444 # Fetch the deps from the repositories and set them up. 445 for dep in deps: 446 dep_dir = os.path.join(self.autodir, 'deps', dep) 447 # Search for the dependency in the repositories if specified, 448 # else check locally. 449 try: 450 self.install_pkg(dep, 'dep', dep_dir) 451 except error.PackageInstallError: 452 # see if the dep is there locally 453 pass 454 455 # dep_dir might not exist if it is not fetched from the repos 456 if not os.path.exists(dep_dir): 457 raise error.TestError("Dependency %s does not exist" % dep) 458 459 os.chdir(dep_dir) 460 utils.system('./' + dep + '.py') 461 462 463 def _runtest(self, url, tag, args, dargs): 464 try: 465 l = lambda : test.runtest(self, url, tag, args, dargs) 466 pid = parallel.fork_start(self.resultdir, l) 467 parallel.fork_waitfor(self.resultdir, pid) 468 except error.TestBaseException: 469 # These are already classified with an error type (exit_status) 470 raise 471 except error.JobError: 472 raise # Caught further up and turned into an ABORT. 473 except Exception, e: 474 # Converts all other exceptions thrown by the test regardless 475 # of phase into a TestError(TestBaseException) subclass that 476 # reports them with their full stack trace. 477 raise error.UnhandledTestError(e) 478 479 480 @_run_test_complete_on_exit 481 def run_test(self, url, *args, **dargs): 482 """ 483 Summon a test object and run it. 484 485 @param url A url that identifies the test to run. 486 @param tag An optional keyword argument that will be added to the 487 test and subdir name. 488 @param subdir_tag An optional keyword argument that will be added 489 to the subdir name. 490 491 @returns True if the test passes, False otherwise. 492 """ 493 group, testname = self.pkgmgr.get_package_name(url, 'test') 494 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 495 outputdir = self._make_test_outputdir(subdir) 496 497 def log_warning(reason): 498 self.record("WARN", subdir, testname, reason) 499 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 500 def group_func(): 501 try: 502 self._runtest(url, tag, args, dargs) 503 except error.TestBaseException, detail: 504 # The error is already classified, record it properly. 505 self.record(detail.exit_status, subdir, testname, str(detail)) 506 raise 507 else: 508 self.record('GOOD', subdir, testname, 'completed successfully') 509 510 try: 511 self._rungroup(subdir, testname, group_func) 512 return True 513 except error.TestBaseException: 514 return False 515 # Any other exception here will be given to the caller 516 # 517 # NOTE: The only exception possible from the control file here 518 # is error.JobError as _runtest() turns all others into an 519 # UnhandledTestError that is caught above. 520 521 522 def _rungroup(self, subdir, testname, function, *args, **dargs): 523 """\ 524 subdir: 525 name of the group 526 testname: 527 name of the test to run, or support step 528 function: 529 subroutine to run 530 *args: 531 arguments for the function 532 533 Returns the result of the passed in function 534 """ 535 536 try: 537 self.record('START', subdir, testname) 538 self._increment_group_level() 539 result = function(*args, **dargs) 540 self._decrement_group_level() 541 self.record('END GOOD', subdir, testname) 542 return result 543 except error.TestBaseException, e: 544 self._decrement_group_level() 545 self.record('END %s' % e.exit_status, subdir, testname) 546 raise 547 except error.JobError, e: 548 self._decrement_group_level() 549 self.record('END ABORT', subdir, testname) 550 raise 551 except Exception, e: 552 # This should only ever happen due to a bug in the given 553 # function's code. The common case of being called by 554 # run_test() will never reach this. If a control file called 555 # run_group() itself, bugs in its function will be caught 556 # here. 557 self._decrement_group_level() 558 err_msg = str(e) + '\n' + traceback.format_exc() 559 self.record('END ERROR', subdir, testname, err_msg) 560 raise 561 562 563 def run_group(self, function, tag=None, **dargs): 564 """ 565 Run a function nested within a group level. 566 567 function: 568 Callable to run. 569 tag: 570 An optional tag name for the group. If None (default) 571 function.__name__ will be used. 572 **dargs: 573 Named arguments for the function. 574 """ 575 if tag: 576 name = tag 577 else: 578 name = function.__name__ 579 580 try: 581 return self._rungroup(subdir=None, testname=name, 582 function=function, **dargs) 583 except (SystemExit, error.TestBaseException): 584 raise 585 # If there was a different exception, turn it into a TestError. 586 # It will be caught by step_engine or _run_step_fn. 587 except Exception, e: 588 raise error.UnhandledTestError(e) 589 590 591 def cpu_count(self): 592 return utils.count_cpus() # use total system count 593 594 595 def start_reboot(self): 596 self.record('START', None, 'reboot') 597 self._increment_group_level() 598 self.record('GOOD', None, 'reboot.start') 599 600 601 def _record_reboot_failure(self, subdir, operation, status, 602 running_id=None): 603 self.record("ABORT", subdir, operation, status) 604 self._decrement_group_level() 605 if not running_id: 606 running_id = utils.running_os_ident() 607 kernel = {"kernel": running_id.split("::")[0]} 608 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 609 610 611 def _check_post_reboot(self, subdir, running_id=None): 612 """ 613 Function to perform post boot checks such as if the mounted 614 partition list has changed across reboots. 615 """ 616 partition_list = partition_lib.get_partition_list(self, 617 exclude_swap=False) 618 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 619 old_mount_info = self.get_state("__mount_info") 620 if mount_info != old_mount_info: 621 new_entries = mount_info - old_mount_info 622 old_entries = old_mount_info - mount_info 623 description = ("mounted partitions are different after reboot " 624 "(old entries: %s, new entries: %s)" % 625 (old_entries, new_entries)) 626 self._record_reboot_failure(subdir, "reboot.verify_config", 627 description, running_id=running_id) 628 raise error.JobError("Reboot failed: %s" % description) 629 630 631 def end_reboot(self, subdir, kernel, patches, running_id=None): 632 self._check_post_reboot(subdir, running_id=running_id) 633 634 # strip ::<timestamp> from the kernel version if present 635 kernel = kernel.split("::")[0] 636 kernel_info = {"kernel": kernel} 637 for i, patch in enumerate(patches): 638 kernel_info["patch%d" % i] = patch 639 self._decrement_group_level() 640 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 641 642 643 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 644 type='src', patches=[]): 645 """ Check the passed kernel identifier against the command line 646 and the running kernel, abort the job on missmatch. """ 647 648 logging.info("POST BOOT: checking booted kernel " 649 "mark=%d identity='%s' type='%s'", 650 expected_when, expected_id, type) 651 652 running_id = utils.running_os_ident() 653 654 cmdline = utils.read_one_line("/proc/cmdline") 655 656 find_sum = re.compile(r'.*IDENT=(\d+)') 657 m = find_sum.match(cmdline) 658 cmdline_when = -1 659 if m: 660 cmdline_when = int(m.groups()[0]) 661 662 # We have all the facts, see if they indicate we 663 # booted the requested kernel or not. 664 bad = False 665 if (type == 'src' and expected_id != running_id or 666 type == 'rpm' and 667 not running_id.startswith(expected_id + '::')): 668 logging.error("Kernel identifier mismatch") 669 bad = True 670 if expected_when != cmdline_when: 671 logging.error("Kernel command line mismatch") 672 bad = True 673 674 if bad: 675 logging.error(" Expected Ident: " + expected_id) 676 logging.error(" Running Ident: " + running_id) 677 logging.error(" Expected Mark: %d", expected_when) 678 logging.error("Command Line Mark: %d", cmdline_when) 679 logging.error(" Command Line: " + cmdline) 680 681 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 682 running_id=running_id) 683 raise error.JobError("Reboot returned with the wrong kernel") 684 685 self.record('GOOD', subdir, 'reboot.verify', 686 utils.running_os_full_version()) 687 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 688 689 690 def partition(self, device, loop_size=0, mountpoint=None): 691 """ 692 Work with a machine partition 693 694 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 695 @param mountpoint: Specify a directory to mount to. If not specified 696 autotest tmp directory will be used. 697 @param loop_size: Size of loopback device (in MB). Defaults to 0. 698 699 @return: A L{client.bin.partition.partition} object 700 """ 701 702 if not mountpoint: 703 mountpoint = self.tmpdir 704 return partition_lib.partition(self, device, loop_size, mountpoint) 705 706 @utils.deprecated 707 def filesystem(self, device, mountpoint=None, loop_size=0): 708 """ Same as partition 709 710 @deprecated: Use partition method instead 711 """ 712 return self.partition(device, loop_size, mountpoint) 713 714 715 def enable_external_logging(self): 716 pass 717 718 719 def disable_external_logging(self): 720 pass 721 722 723 def default_tag(self, tag): 724 """Allows the scheduler's job tag to be passed in from autoserv.""" 725 if not self._is_continuation: 726 self.set_state("__job_tag", tag) 727 self.tag = tag 728 729 730 def reboot_setup(self): 731 # save the partition list and their mount point and compare it 732 # after reboot 733 partition_list = partition_lib.get_partition_list(self, 734 exclude_swap=False) 735 mount_info = set((p.device, p.get_mountpoint()) for p in partition_list) 736 self.set_state("__mount_info", mount_info) 737 738 739 def reboot(self, tag=LAST_BOOT_TAG): 740 if tag == LAST_BOOT_TAG: 741 tag = self.last_boot_tag 742 else: 743 self.last_boot_tag = tag 744 745 self.reboot_setup() 746 self.harness.run_reboot() 747 default = self.config_get('boot.set_default') 748 if default: 749 self.bootloader.set_default(tag) 750 else: 751 self.bootloader.boot_once(tag) 752 753 # HACK: using this as a module sometimes hangs shutdown, so if it's 754 # installed unload it first 755 utils.system("modprobe -r netconsole", ignore_status=True) 756 757 # sync first, so that a sync during shutdown doesn't time out 758 utils.system("sync; sync", ignore_status=True) 759 760 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 761 self.quit() 762 763 764 def noop(self, text): 765 logging.info("job: noop: " + text) 766 767 768 @_run_test_complete_on_exit 769 def parallel(self, *tasklist): 770 """Run tasks in parallel""" 771 772 pids = [] 773 old_log_filename = self._log_filename 774 for i, task in enumerate(tasklist): 775 assert isinstance(task, (tuple, list)) 776 self._log_filename = old_log_filename + (".%d" % i) 777 task_func = lambda: task[0](*task[1:]) 778 pids.append(parallel.fork_start(self.resultdir, task_func)) 779 780 old_log_path = os.path.join(self.resultdir, old_log_filename) 781 old_log = open(old_log_path, "a") 782 exceptions = [] 783 for i, pid in enumerate(pids): 784 # wait for the task to finish 785 try: 786 parallel.fork_waitfor(self.resultdir, pid) 787 except Exception, e: 788 exceptions.append(e) 789 # copy the logs from the subtask into the main log 790 new_log_path = old_log_path + (".%d" % i) 791 if os.path.exists(new_log_path): 792 new_log = open(new_log_path) 793 old_log.write(new_log.read()) 794 new_log.close() 795 old_log.flush() 796 os.remove(new_log_path) 797 old_log.close() 798 799 self._log_filename = old_log_filename 800 801 # handle any exceptions raised by the parallel tasks 802 if exceptions: 803 msg = "%d task(s) failed in job.parallel" % len(exceptions) 804 raise error.JobError(msg) 805 806 807 def quit(self): 808 # XXX: should have a better name. 809 self.harness.run_pause() 810 raise error.JobContinue("more to come") 811 812 813 def complete(self, status): 814 """Clean up and exit""" 815 # We are about to exit 'complete' so clean up the control file. 816 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 817 shutil.move(self._state_file, dest) 818 819 self.harness.run_complete() 820 self.disable_external_logging() 821 sys.exit(status) 822 823 824 def _load_state(self): 825 # grab any initial state and set up $CONTROL.state as the backing file 826 init_state_file = self.control + '.init.state' 827 self._state_file = self.control + '.state' 828 if os.path.exists(init_state_file): 829 shutil.move(init_state_file, self._state_file) 830 self._state.set_backing_file(self._state_file) 831 832 # initialize the state engine, if necessary 833 has_steps = self._state.has('client', 'steps') 834 if not self._is_continuation and has_steps: 835 raise RuntimeError('Loaded state can only contain client.steps if ' 836 'this is a continuation') 837 838 if not has_steps: 839 logging.info('Initializing the state engine') 840 self._state.set('client', 'steps', []) 841 842 843 def __create_step_tuple(self, fn, args, dargs): 844 # Legacy code passes in an array where the first arg is 845 # the function or its name. 846 if isinstance(fn, list): 847 assert(len(args) == 0) 848 assert(len(dargs) == 0) 849 args = fn[1:] 850 fn = fn[0] 851 # Pickling actual functions is hairy, thus we have to call 852 # them by name. Unfortunately, this means only functions 853 # defined globally can be used as a next step. 854 if callable(fn): 855 fn = fn.__name__ 856 if not isinstance(fn, types.StringTypes): 857 raise StepError("Next steps must be functions or " 858 "strings containing the function name") 859 ancestry = copy.copy(self._current_step_ancestry) 860 return (ancestry, fn, args, dargs) 861 862 863 def next_step_append(self, fn, *args, **dargs): 864 """Define the next step and place it at the end""" 865 steps = self._state.get('client', 'steps') 866 steps.append(self.__create_step_tuple(fn, args, dargs)) 867 self._state.set('client', 'steps', steps) 868 869 870 def next_step(self, fn, *args, **dargs): 871 """Create a new step and place it after any steps added 872 while running the current step but before any steps added in 873 previous steps""" 874 steps = self._state.get('client', 'steps') 875 steps.insert(self._next_step_index, 876 self.__create_step_tuple(fn, args, dargs)) 877 self._next_step_index += 1 878 self._state.set('client', 'steps', steps) 879 880 881 def next_step_prepend(self, fn, *args, **dargs): 882 """Insert a new step, executing first""" 883 steps = self._state.get('client', 'steps') 884 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 885 self._next_step_index += 1 886 self._state.set('client', 'steps', steps) 887 888 889 890 def _run_step_fn(self, local_vars, fn, args, dargs): 891 """Run a (step) function within the given context""" 892 893 local_vars['__args'] = args 894 local_vars['__dargs'] = dargs 895 try: 896 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 897 return local_vars['__ret'] 898 except SystemExit: 899 raise # Send error.JobContinue and JobComplete on up to runjob. 900 except error.TestNAError, detail: 901 self.record(detail.exit_status, None, fn, str(detail)) 902 except Exception, detail: 903 raise error.UnhandledJobError(detail) 904 905 906 def _create_frame(self, global_vars, ancestry, fn_name): 907 """Set up the environment like it would have been when this 908 function was first defined. 909 910 Child step engine 'implementations' must have 'return locals()' 911 at end end of their steps. Because of this, we can call the 912 parent function and get back all child functions (i.e. those 913 defined within it). 914 915 Unfortunately, the call stack of the function calling 916 job.next_step might have been deeper than the function it 917 added. In order to make sure that the environment is what it 918 should be, we need to then pop off the frames we built until 919 we find the frame where the function was first defined.""" 920 921 # The copies ensure that the parent frames are not modified 922 # while building child frames. This matters if we then 923 # pop some frames in the next part of this function. 924 current_frame = copy.copy(global_vars) 925 frames = [current_frame] 926 for steps_fn_name in ancestry: 927 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 928 current_frame = copy.copy(ret) 929 frames.append(current_frame) 930 931 # Walk up the stack frames until we find the place fn_name was defined. 932 while len(frames) > 2: 933 if fn_name not in frames[-2]: 934 break 935 if frames[-2][fn_name] != frames[-1][fn_name]: 936 break 937 frames.pop() 938 ancestry.pop() 939 940 return (frames[-1], ancestry) 941 942 943 def _add_step_init(self, local_vars, current_function): 944 """If the function returned a dictionary that includes a 945 function named 'step_init', prepend it to our list of steps. 946 This will only get run the first time a function with a nested 947 use of the step engine is run.""" 948 949 if (isinstance(local_vars, dict) and 950 'step_init' in local_vars and 951 callable(local_vars['step_init'])): 952 # The init step is a child of the function 953 # we were just running. 954 self._current_step_ancestry.append(current_function) 955 self.next_step_prepend('step_init') 956 957 958 def step_engine(self): 959 """The multi-run engine used when the control file defines step_init. 960 961 Does the next step. 962 """ 963 964 # Set up the environment and then interpret the control file. 965 # Some control files will have code outside of functions, 966 # which means we need to have our state engine initialized 967 # before reading in the file. 968 global_control_vars = {'job': self} 969 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 970 try: 971 execfile(self.control, global_control_vars, global_control_vars) 972 except error.TestNAError, detail: 973 self.record(detail.exit_status, None, self.control, str(detail)) 974 except SystemExit: 975 raise # Send error.JobContinue and JobComplete on up to runjob. 976 except Exception, detail: 977 # Syntax errors or other general Python exceptions coming out of 978 # the top level of the control file itself go through here. 979 raise error.UnhandledJobError(detail) 980 981 # If we loaded in a mid-job state file, then we presumably 982 # know what steps we have yet to run. 983 if not self._is_continuation: 984 if 'step_init' in global_control_vars: 985 self.next_step(global_control_vars['step_init']) 986 987 # Iterate through the steps. If we reboot, we'll simply 988 # continue iterating on the next step. 989 while len(self._state.get('client', 'steps')) > 0: 990 steps = self._state.get('client', 'steps') 991 (ancestry, fn_name, args, dargs) = steps.pop(0) 992 self._state.set('client', 'steps', steps) 993 994 self._next_step_index = 0 995 ret = self._create_frame(global_control_vars, ancestry, fn_name) 996 local_vars, self._current_step_ancestry = ret 997 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 998 self._add_step_init(local_vars, fn_name) 999 1000 1001 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1002 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1003 on_every_test) 1004 1005 1006 def add_sysinfo_logfile(self, file, on_every_test=False): 1007 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1008 1009 1010 def _add_sysinfo_loggable(self, loggable, on_every_test): 1011 if on_every_test: 1012 self.sysinfo.test_loggables.add(loggable) 1013 else: 1014 self.sysinfo.boot_loggables.add(loggable) 1015 self._save_sysinfo_state() 1016 1017 1018 def _load_sysinfo_state(self): 1019 state = self._state.get('client', 'sysinfo', None) 1020 if state: 1021 self.sysinfo.deserialize(state) 1022 1023 1024 def _save_sysinfo_state(self): 1025 state = self.sysinfo.serialize() 1026 self._state.set('client', 'sysinfo', state) 1027 1028 1029 def _init_group_level(self): 1030 self._group_level = self.get_state("__group_level", default=0) 1031 1032 1033 def _increment_group_level(self): 1034 self._group_level += 1 1035 self.set_state("__group_level", self._group_level) 1036 1037 1038 def _decrement_group_level(self): 1039 self._group_level -= 1 1040 self.set_state("__group_level", self._group_level) 1041 1042 1043 def record(self, status_code, subdir, operation, status = '', 1044 optional_fields=None): 1045 """ 1046 Record job-level status 1047 1048 The intent is to make this file both machine parseable and 1049 human readable. That involves a little more complexity, but 1050 really isn't all that bad ;-) 1051 1052 Format is <status code>\t<subdir>\t<operation>\t<status> 1053 1054 status code: (GOOD|WARN|FAIL|ABORT) 1055 or START 1056 or END (GOOD|WARN|FAIL|ABORT) 1057 1058 subdir: MUST be a relevant subdirectory in the results, 1059 or None, which will be represented as '----' 1060 1061 operation: description of what you ran (e.g. "dbench", or 1062 "mkfs -t foobar /dev/sda9") 1063 1064 status: error message or "completed sucessfully" 1065 1066 ------------------------------------------------------------ 1067 1068 Initial tabs indicate indent levels for grouping, and is 1069 governed by self.group_level 1070 1071 multiline messages have secondary lines prefaced by a double 1072 space (' ') 1073 """ 1074 1075 if subdir: 1076 if re.match(r'[\n\t]', subdir): 1077 raise ValueError("Invalid character in subdir string") 1078 substr = subdir 1079 else: 1080 substr = '----' 1081 1082 if not log.is_valid_status(status_code): 1083 raise ValueError("Invalid status code supplied: %s" % status_code) 1084 if not operation: 1085 operation = '----' 1086 1087 if re.match(r'[\n\t]', operation): 1088 raise ValueError("Invalid character in operation string") 1089 operation = operation.rstrip() 1090 1091 if not optional_fields: 1092 optional_fields = {} 1093 1094 status = status.rstrip() 1095 status = re.sub(r"\t", " ", status) 1096 # Ensure any continuation lines are marked so we can 1097 # detect them in the status file to ensure it is parsable. 1098 status = re.sub(r"\n", "\n" + "\t" * self._group_level + " ", status) 1099 1100 # Generate timestamps for inclusion in the logs 1101 epoch_time = int(time.time()) # seconds since epoch, in UTC 1102 local_time = time.localtime(epoch_time) 1103 optional_fields["timestamp"] = str(epoch_time) 1104 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1105 local_time) 1106 1107 fields = [status_code, substr, operation] 1108 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1109 fields.append(status) 1110 1111 msg = '\t'.join(str(x) for x in fields) 1112 msg = '\t' * self._group_level + msg 1113 1114 msg_tag = "" 1115 if "." in self._log_filename: 1116 msg_tag = self._log_filename.split(".", 1)[1] 1117 1118 self.harness.test_status_detail(status_code, substr, operation, status, 1119 msg_tag) 1120 self.harness.test_status(msg, msg_tag) 1121 1122 # log to stdout (if enabled) 1123 logging.info(msg) 1124 1125 # log to the "root" status log 1126 status_file = os.path.join(self.resultdir, self._log_filename) 1127 open(status_file, "a").write(msg + "\n") 1128 1129 # log to the subdir status log (if subdir is set) 1130 if subdir: 1131 dir = os.path.join(self.resultdir, subdir) 1132 status_file = os.path.join(dir, self._DEFAULT_LOG_FILENAME) 1133 open(status_file, "a").write(msg + "\n") 1134 1135 1136class disk_usage_monitor: 1137 def __init__(self, logging_func, device, max_mb_per_hour): 1138 self.func = logging_func 1139 self.device = device 1140 self.max_mb_per_hour = max_mb_per_hour 1141 1142 1143 def start(self): 1144 self.initial_space = utils.freespace(self.device) 1145 self.start_time = time.time() 1146 1147 1148 def stop(self): 1149 # if no maximum usage rate was set, we don't need to 1150 # generate any warnings 1151 if not self.max_mb_per_hour: 1152 return 1153 1154 final_space = utils.freespace(self.device) 1155 used_space = self.initial_space - final_space 1156 stop_time = time.time() 1157 total_time = stop_time - self.start_time 1158 # round up the time to one minute, to keep extremely short 1159 # tests from generating false positives due to short, badly 1160 # timed bursts of activity 1161 total_time = max(total_time, 60.0) 1162 1163 # determine the usage rate 1164 bytes_per_sec = used_space / total_time 1165 mb_per_sec = bytes_per_sec / 1024**2 1166 mb_per_hour = mb_per_sec * 60 * 60 1167 1168 if mb_per_hour > self.max_mb_per_hour: 1169 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1170 msg %= (self.device, mb_per_hour) 1171 self.func(msg) 1172 1173 1174 @classmethod 1175 def watch(cls, *monitor_args, **monitor_dargs): 1176 """ Generic decorator to wrap a function call with the 1177 standard create-monitor -> start -> call -> stop idiom.""" 1178 def decorator(func): 1179 def watched_func(*args, **dargs): 1180 monitor = cls(*monitor_args, **monitor_dargs) 1181 monitor.start() 1182 try: 1183 func(*args, **dargs) 1184 finally: 1185 monitor.stop() 1186 return watched_func 1187 return decorator 1188 1189 1190def runjob(control, drop_caches, options): 1191 """ 1192 Run a job using the given control file. 1193 1194 This is the main interface to this module. 1195 1196 @see base_job.__init__ for parameter info. 1197 """ 1198 control = os.path.abspath(control) 1199 state = control + '.state' 1200 # Ensure state file is cleaned up before the job starts to run if autotest 1201 # is not running with the --continue flag 1202 if not options.cont and os.path.isfile(state): 1203 logging.debug('Cleaning up previously found state file') 1204 os.remove(state) 1205 1206 # instantiate the job object ready for the control file. 1207 myjob = None 1208 try: 1209 # Check that the control file is valid 1210 if not os.path.exists(control): 1211 raise error.JobError(control + ": control file not found") 1212 1213 # When continuing, the job is complete when there is no 1214 # state file, ensure we don't try and continue. 1215 if options.cont and not os.path.exists(state): 1216 raise error.JobComplete("all done") 1217 1218 myjob = job(control=control, drop_caches=drop_caches, options=options) 1219 1220 # Load in the users control file, may do any one of: 1221 # 1) execute in toto 1222 # 2) define steps, and select the first via next_step() 1223 myjob.step_engine() 1224 1225 except error.JobContinue: 1226 sys.exit(5) 1227 1228 except error.JobComplete: 1229 sys.exit(1) 1230 1231 except error.JobError, instance: 1232 logging.error("JOB ERROR: " + instance.args[0]) 1233 if myjob: 1234 command = None 1235 if len(instance.args) > 1: 1236 command = instance.args[1] 1237 myjob.record('ABORT', None, command, instance.args[0]) 1238 myjob._decrement_group_level() 1239 myjob.record('END ABORT', None, None, instance.args[0]) 1240 assert (myjob._group_level == 0), ('myjob._group_level must be 0,' 1241 ' not %d' % myjob._group_level) 1242 myjob.complete(1) 1243 else: 1244 sys.exit(1) 1245 1246 except Exception, e: 1247 # NOTE: job._run_step_fn and job.step_engine will turn things into 1248 # a JobError for us. If we get here, its likely an autotest bug. 1249 msg = str(e) + '\n' + traceback.format_exc() 1250 logging.critical("JOB ERROR (autotest bug?): " + msg) 1251 if myjob: 1252 myjob._decrement_group_level() 1253 myjob.record('END ABORT', None, None, msg) 1254 assert(myjob._group_level == 0) 1255 myjob.complete(1) 1256 else: 1257 sys.exit(1) 1258 1259 # If we get here, then we assume the job is complete and good. 1260 myjob._decrement_group_level() 1261 myjob.record('END GOOD', None, None) 1262 assert(myjob._group_level == 0) 1263 1264 myjob.complete(0) 1265 1266 1267site_job = utils.import_site_class( 1268 __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job) 1269 1270class job(site_job): 1271 pass 1272