job.py revision 861b2d54aec24228cdb3895dbc40062cb40cb2ad
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types, glob 9import logging, getpass, errno, weakref 10import cPickle as pickle 11from autotest_lib.client.bin import client_logging_config 12from autotest_lib.client.bin import utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, test, local_host 15from autotest_lib.client.bin import partition as partition_lib 16from autotest_lib.client.common_lib import base_job 17from autotest_lib.client.common_lib import error, barrier, log, logging_manager 18from autotest_lib.client.common_lib import base_packages, packages 19from autotest_lib.client.common_lib import global_config 20 21 22LAST_BOOT_TAG = object() 23JOB_PREAMBLE = """ 24from autotest_lib.client.common_lib.error import * 25from autotest_lib.client.bin.utils import * 26""" 27 28 29class StepError(error.AutotestError): 30 pass 31 32class NotAvailableError(error.AutotestError): 33 pass 34 35 36 37def _run_test_complete_on_exit(f): 38 """Decorator for job methods that automatically calls 39 self.harness.run_test_complete when the method exits, if appropriate.""" 40 def wrapped(self, *args, **dargs): 41 try: 42 return f(self, *args, **dargs) 43 finally: 44 if self._logger.global_filename == 'status': 45 self.harness.run_test_complete() 46 if self.drop_caches: 47 logging.debug("Dropping caches") 48 utils.drop_caches() 49 wrapped.__name__ = f.__name__ 50 wrapped.__doc__ = f.__doc__ 51 wrapped.__dict__.update(f.__dict__) 52 return wrapped 53 54 55class status_indenter(base_job.status_indenter): 56 """Provide a status indenter that is backed by job._record_prefix.""" 57 def __init__(self, job): 58 self.job = weakref.proxy(job) # avoid a circular reference 59 60 61 @property 62 def indent(self): 63 return self.job._record_indent 64 65 66 def increment(self): 67 self.job._record_indent += 1 68 69 70 def decrement(self): 71 self.job._record_indent -= 1 72 73 74class base_client_job(base_job.base_job): 75 """The client-side concrete implementation of base_job. 76 77 Optional properties provided by this implementation: 78 control 79 bootloader 80 harness 81 """ 82 83 _WARNING_DISABLE_DELAY = 5 84 85 # _record_indent is a persistent property, but only on the client 86 _job_state = base_job.base_job._job_state 87 _record_indent = _job_state.property_factory( 88 '_state', '_record_indent', 0, namespace='client') 89 _max_disk_usage_rate = _job_state.property_factory( 90 '_state', '_max_disk_usage_rate', 0.0, namespace='client') 91 92 93 def __init__(self, control, options, drop_caches=True, 94 extra_copy_cmdline=None): 95 """ 96 Prepare a client side job object. 97 98 @param control: The control file (pathname of). 99 @param options: an object which includes: 100 jobtag: The job tag string (eg "default"). 101 cont: If this is the continuation of this job. 102 harness_type: An alternative server harness. [None] 103 use_external_logging: If true, the enable_external_logging 104 method will be called during construction. [False] 105 @param drop_caches: If true, utils.drop_caches() is called before and 106 between all tests. [True] 107 @param extra_copy_cmdline: list of additional /proc/cmdline arguments to 108 copy from the running kernel to all the installed kernels with 109 this job 110 """ 111 super(base_client_job, self).__init__(options=options) 112 self._pre_record_init(control, options) 113 try: 114 self._post_record_init(control, options, drop_caches, 115 extra_copy_cmdline) 116 except Exception, err: 117 self.record( 118 'ABORT', None, None,'client.bin.job.__init__ failed: %s' % 119 str(err)) 120 raise 121 122 123 @classmethod 124 def _get_environ_autodir(cls): 125 return os.environ['AUTODIR'] 126 127 128 @classmethod 129 def _find_base_directories(cls): 130 """ 131 Determine locations of autodir and clientdir (which are the same) 132 using os.environ. Serverdir does not exist in this context. 133 """ 134 autodir = clientdir = cls._get_environ_autodir() 135 return autodir, clientdir, None 136 137 138 @classmethod 139 def _parse_args(cls, args): 140 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args) 141 142 143 def _find_resultdir(self, options): 144 """ 145 Determine the directory for storing results. On a client this is 146 always <autodir>/results/<tag>, where tag is passed in on the command 147 line as an option. 148 """ 149 return os.path.join(self.autodir, 'results', options.tag) 150 151 152 def _get_status_logger(self): 153 """Return a reference to the status logger.""" 154 return self._logger 155 156 157 def _pre_record_init(self, control, options): 158 """ 159 Initialization function that should peform ONLY the required 160 setup so that the self.record() method works. 161 162 As of now self.record() needs self.resultdir, self._group_level, 163 self.harness and of course self._logger. 164 """ 165 if not options.cont: 166 self._cleanup_debugdir_files() 167 self._cleanup_results_dir() 168 169 logging_manager.configure_logging( 170 client_logging_config.ClientLoggingConfig(), 171 results_dir=self.resultdir, 172 verbose=options.verbose) 173 logging.info('Writing results to %s', self.resultdir) 174 175 # init_group_level needs the state 176 self.control = os.path.realpath(control) 177 self._is_continuation = options.cont 178 self._current_step_ancestry = [] 179 self._next_step_index = 0 180 self._load_state() 181 182 # harness is chosen by following rules: 183 # 1. explicitly specified via command line 184 # 2. harness stored in state file (if continuing job '-c') 185 # 3. default harness 186 selected_harness = None 187 if options.harness: 188 selected_harness = options.harness 189 self._state.set('client', 'harness', selected_harness) 190 else: 191 stored_harness = self._state.get('client', 'harness', None) 192 if stored_harness: 193 selected_harness = stored_harness 194 195 self.harness = harness.select(selected_harness, self) 196 197 # set up the status logger 198 def client_job_record_hook(entry): 199 msg_tag = '' 200 if '.' in self._logger.global_filename: 201 msg_tag = self._logger.global_filename.split('.', 1)[1] 202 # send the entry to the job harness 203 message = '\n'.join([entry.message] + entry.extra_message_lines) 204 rendered_entry = self._logger.render_entry(entry) 205 self.harness.test_status_detail(entry.status_code, entry.subdir, 206 entry.operation, message, msg_tag) 207 self.harness.test_status(rendered_entry, msg_tag) 208 # send the entry to stdout, if it's enabled 209 logging.info(rendered_entry) 210 self._logger = base_job.status_logger( 211 self, status_indenter(self), record_hook=client_job_record_hook, 212 tap_writer=self._tap) 213 214 def _post_record_init(self, control, options, drop_caches, 215 extra_copy_cmdline): 216 """ 217 Perform job initialization not required by self.record(). 218 """ 219 self._init_drop_caches(drop_caches) 220 221 self._init_packages() 222 223 self.sysinfo = sysinfo.sysinfo(self.resultdir) 224 self._load_sysinfo_state() 225 226 if not options.cont: 227 download = os.path.join(self.testdir, 'download') 228 if not os.path.exists(download): 229 os.mkdir(download) 230 231 shutil.copyfile(self.control, 232 os.path.join(self.resultdir, 'control')) 233 234 self.control = control 235 236 self.logging = logging_manager.get_logging_manager( 237 manage_stdout_and_stderr=True, redirect_fds=True) 238 self.logging.start_logging() 239 240 self._config = config.config(self) 241 self.profilers = profilers.profilers(self) 242 243 self._init_bootloader() 244 245 self.machines = [options.hostname] 246 self.hosts = set([local_host.LocalHost(hostname=options.hostname, 247 bootloader=self.bootloader)]) 248 249 self.args = [] 250 if options.args: 251 self.args = self._parse_args(options.args) 252 253 if options.user: 254 self.user = options.user 255 else: 256 self.user = getpass.getuser() 257 258 self.sysinfo.log_per_reboot_data() 259 260 if not options.cont: 261 self.record('START', None, None) 262 263 self.harness.run_start() 264 265 if options.log: 266 self.enable_external_logging() 267 268 self._init_cmdline(extra_copy_cmdline) 269 270 self.num_tests_run = None 271 self.num_tests_failed = None 272 273 self.warning_loggers = None 274 self.warning_manager = None 275 276 277 def _init_drop_caches(self, drop_caches): 278 """ 279 Perform the drop caches initialization. 280 """ 281 self.drop_caches_between_iterations = ( 282 global_config.global_config.get_config_value('CLIENT', 283 'drop_caches_between_iterations', 284 type=bool, default=True)) 285 self.drop_caches = drop_caches 286 if self.drop_caches: 287 logging.debug("Dropping caches") 288 utils.drop_caches() 289 290 291 def _init_bootloader(self): 292 """ 293 Perform boottool initialization. 294 """ 295 tool = self.config_get('boottool.executable') 296 self.bootloader = boottool.boottool(tool) 297 298 299 def _init_packages(self): 300 """ 301 Perform the packages support initialization. 302 """ 303 self.pkgmgr = packages.PackageManager( 304 self.autodir, run_function_dargs={'timeout':3600}) 305 306 307 def _init_cmdline(self, extra_copy_cmdline): 308 """ 309 Initialize default cmdline for booted kernels in this job. 310 """ 311 copy_cmdline = set(['console']) 312 if extra_copy_cmdline is not None: 313 copy_cmdline.update(extra_copy_cmdline) 314 315 # extract console= and other args from cmdline and add them into the 316 # base args that we use for all kernels we install 317 cmdline = utils.read_one_line('/proc/cmdline') 318 kernel_args = [] 319 for karg in cmdline.split(): 320 for param in copy_cmdline: 321 if karg.startswith(param) and \ 322 (len(param) == len(karg) or karg[len(param)] == '='): 323 kernel_args.append(karg) 324 self.config_set('boot.default_args', ' '.join(kernel_args)) 325 326 327 def _cleanup_results_dir(self): 328 """Delete everything in resultsdir""" 329 assert os.path.exists(self.resultdir) 330 list_files = glob.glob('%s/*' % self.resultdir) 331 for f in list_files: 332 if os.path.isdir(f): 333 shutil.rmtree(f) 334 elif os.path.isfile(f): 335 os.remove(f) 336 337 338 def _cleanup_debugdir_files(self): 339 """ 340 Delete any leftover debugdir files 341 """ 342 list_files = glob.glob("/tmp/autotest_results_dir.*") 343 for f in list_files: 344 os.remove(f) 345 346 347 def disable_warnings(self, warning_type): 348 self.record("INFO", None, None, 349 "disabling %s warnings" % warning_type, 350 {"warnings.disable": warning_type}) 351 time.sleep(self._WARNING_DISABLE_DELAY) 352 353 354 def enable_warnings(self, warning_type): 355 time.sleep(self._WARNING_DISABLE_DELAY) 356 self.record("INFO", None, None, 357 "enabling %s warnings" % warning_type, 358 {"warnings.enable": warning_type}) 359 360 361 def monitor_disk_usage(self, max_rate): 362 """\ 363 Signal that the job should monitor disk space usage on / 364 and generate a warning if a test uses up disk space at a 365 rate exceeding 'max_rate'. 366 367 Parameters: 368 max_rate - the maximium allowed rate of disk consumption 369 during a test, in MB/hour, or 0 to indicate 370 no limit. 371 """ 372 self._max_disk_usage_rate = max_rate 373 374 375 def relative_path(self, path): 376 """\ 377 Return a patch relative to the job results directory 378 """ 379 head = len(self.resultdir) + 1 # remove the / inbetween 380 return path[head:] 381 382 383 def control_get(self): 384 return self.control 385 386 387 def control_set(self, control): 388 self.control = os.path.abspath(control) 389 390 391 def harness_select(self, which): 392 self.harness = harness.select(which, self) 393 394 395 def config_set(self, name, value): 396 self._config.set(name, value) 397 398 399 def config_get(self, name): 400 return self._config.get(name) 401 402 403 def setup_dirs(self, results_dir, tmp_dir): 404 if not tmp_dir: 405 tmp_dir = os.path.join(self.tmpdir, 'build') 406 if not os.path.exists(tmp_dir): 407 os.mkdir(tmp_dir) 408 if not os.path.isdir(tmp_dir): 409 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 410 raise ValueError(e_msg) 411 412 # We label the first build "build" and then subsequent ones 413 # as "build.2", "build.3", etc. Whilst this is a little bit 414 # inconsistent, 99.9% of jobs will only have one build 415 # (that's not done as kernbench, sparse, or buildtest), 416 # so it works out much cleaner. One of life's comprimises. 417 if not results_dir: 418 results_dir = os.path.join(self.resultdir, 'build') 419 i = 2 420 while os.path.exists(results_dir): 421 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 422 i += 1 423 if not os.path.exists(results_dir): 424 os.mkdir(results_dir) 425 426 return (results_dir, tmp_dir) 427 428 429 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 430 kjob = None ): 431 """Summon a xen object""" 432 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 433 build_dir = 'xen' 434 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 435 leave, kjob) 436 437 438 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 439 """Summon a kernel object""" 440 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 441 build_dir = 'linux' 442 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 443 build_dir, leave) 444 445 446 def barrier(self, *args, **kwds): 447 """Create a barrier object""" 448 return barrier.barrier(*args, **kwds) 449 450 451 def install_pkg(self, name, pkg_type, install_dir): 452 ''' 453 This method is a simple wrapper around the actual package 454 installation method in the Packager class. This is used 455 internally by the profilers, deps and tests code. 456 name : name of the package (ex: sleeptest, dbench etc.) 457 pkg_type : Type of the package (ex: test, dep etc.) 458 install_dir : The directory in which the source is actually 459 untarred into. (ex: client/profilers/<name> for profilers) 460 ''' 461 if self.pkgmgr.repositories: 462 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir) 463 464 465 def add_repository(self, repo_urls): 466 ''' 467 Adds the repository locations to the job so that packages 468 can be fetched from them when needed. The repository list 469 needs to be a string list 470 Ex: job.add_repository(['http://blah1','http://blah2']) 471 ''' 472 for repo_url in repo_urls: 473 self.pkgmgr.add_repository(repo_url) 474 475 # Fetch the packages' checksum file that contains the checksums 476 # of all the packages if it is not already fetched. The checksum 477 # is always fetched whenever a job is first started. This 478 # is not done in the job's constructor as we don't have the list of 479 # the repositories there (and obviously don't care about this file 480 # if we are not using the repos) 481 try: 482 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 483 base_packages.CHECKSUM_FILE) 484 self.pkgmgr.fetch_pkg(base_packages.CHECKSUM_FILE, 485 checksum_file_path, use_checksum=False) 486 except error.PackageFetchError: 487 # packaging system might not be working in this case 488 # Silently fall back to the normal case 489 pass 490 491 492 def require_gcc(self): 493 """ 494 Test whether gcc is installed on the machine. 495 """ 496 # check if gcc is installed on the system. 497 try: 498 utils.system('which gcc') 499 except error.CmdError, e: 500 raise NotAvailableError('gcc is required by this job and is ' 501 'not available on the system') 502 503 504 def setup_dep(self, deps): 505 """Set up the dependencies for this test. 506 deps is a list of libraries required for this test. 507 """ 508 # Fetch the deps from the repositories and set them up. 509 for dep in deps: 510 dep_dir = os.path.join(self.autodir, 'deps', dep) 511 # Search for the dependency in the repositories if specified, 512 # else check locally. 513 try: 514 self.install_pkg(dep, 'dep', dep_dir) 515 except error.PackageInstallError: 516 # see if the dep is there locally 517 pass 518 519 # dep_dir might not exist if it is not fetched from the repos 520 if not os.path.exists(dep_dir): 521 raise error.TestError("Dependency %s does not exist" % dep) 522 523 os.chdir(dep_dir) 524 if execfile('%s.py' % dep, {}) is None: 525 logging.info('Dependency %s successfuly built', dep) 526 527 528 def _runtest(self, url, tag, args, dargs): 529 try: 530 l = lambda : test.runtest(self, url, tag, args, dargs) 531 pid = parallel.fork_start(self.resultdir, l) 532 parallel.fork_waitfor(self.resultdir, pid) 533 except error.TestBaseException: 534 # These are already classified with an error type (exit_status) 535 raise 536 except error.JobError: 537 raise # Caught further up and turned into an ABORT. 538 except Exception, e: 539 # Converts all other exceptions thrown by the test regardless 540 # of phase into a TestError(TestBaseException) subclass that 541 # reports them with their full stack trace. 542 raise error.UnhandledTestError(e) 543 544 545 @_run_test_complete_on_exit 546 def run_test(self, url, *args, **dargs): 547 """ 548 Summon a test object and run it. 549 550 @param url A url that identifies the test to run. 551 @param tag An optional keyword argument that will be added to the 552 test and subdir name. 553 @param subdir_tag An optional keyword argument that will be added 554 to the subdir name. 555 556 @returns True if the test passes, False otherwise. 557 """ 558 group, testname = self.pkgmgr.get_package_name(url, 'test') 559 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 560 outputdir = self._make_test_outputdir(subdir) 561 562 def log_warning(reason): 563 self.record("WARN", subdir, testname, reason) 564 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate) 565 def group_func(): 566 try: 567 self._runtest(url, tag, args, dargs) 568 except error.TestBaseException, detail: 569 # The error is already classified, record it properly. 570 self.record(detail.exit_status, subdir, testname, str(detail)) 571 raise 572 else: 573 self.record('GOOD', subdir, testname, 'completed successfully') 574 575 try: 576 self._rungroup(subdir, testname, group_func) 577 return True 578 except error.TestBaseException: 579 return False 580 # Any other exception here will be given to the caller 581 # 582 # NOTE: The only exception possible from the control file here 583 # is error.JobError as _runtest() turns all others into an 584 # UnhandledTestError that is caught above. 585 586 587 def _rungroup(self, subdir, testname, function, *args, **dargs): 588 """\ 589 subdir: 590 name of the group 591 testname: 592 name of the test to run, or support step 593 function: 594 subroutine to run 595 *args: 596 arguments for the function 597 598 Returns the result of the passed in function 599 """ 600 601 try: 602 self.record('START', subdir, testname) 603 self._state.set('client', 'unexpected_reboot', (subdir, testname)) 604 result = function(*args, **dargs) 605 self.record('END GOOD', subdir, testname) 606 return result 607 except error.TestBaseException, e: 608 self.record('END %s' % e.exit_status, subdir, testname) 609 raise 610 except error.JobError, e: 611 self.record('END ABORT', subdir, testname) 612 raise 613 except Exception, e: 614 # This should only ever happen due to a bug in the given 615 # function's code. The common case of being called by 616 # run_test() will never reach this. If a control file called 617 # run_group() itself, bugs in its function will be caught 618 # here. 619 err_msg = str(e) + '\n' + traceback.format_exc() 620 self.record('END ERROR', subdir, testname, err_msg) 621 raise 622 finally: 623 self._state.discard('client', 'unexpected_reboot') 624 625 626 def run_group(self, function, tag=None, **dargs): 627 """ 628 Run a function nested within a group level. 629 630 function: 631 Callable to run. 632 tag: 633 An optional tag name for the group. If None (default) 634 function.__name__ will be used. 635 **dargs: 636 Named arguments for the function. 637 """ 638 if tag: 639 name = tag 640 else: 641 name = function.__name__ 642 643 try: 644 return self._rungroup(subdir=None, testname=name, 645 function=function, **dargs) 646 except (SystemExit, error.TestBaseException): 647 raise 648 # If there was a different exception, turn it into a TestError. 649 # It will be caught by step_engine or _run_step_fn. 650 except Exception, e: 651 raise error.UnhandledTestError(e) 652 653 654 def cpu_count(self): 655 return utils.count_cpus() # use total system count 656 657 658 def start_reboot(self): 659 self.record('START', None, 'reboot') 660 self.record('GOOD', None, 'reboot.start') 661 662 663 def _record_reboot_failure(self, subdir, operation, status, 664 running_id=None): 665 self.record("ABORT", subdir, operation, status) 666 if not running_id: 667 running_id = utils.running_os_ident() 668 kernel = {"kernel": running_id.split("::")[0]} 669 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 670 671 672 def _check_post_reboot(self, subdir, running_id=None): 673 """ 674 Function to perform post boot checks such as if the system configuration 675 has changed across reboots (specifically, CPUs and partitions). 676 677 @param subdir: The subdir to use in the job.record call. 678 @param running_id: An optional running_id to include in the reboot 679 failure log message 680 681 @raise JobError: Raised if the current configuration does not match the 682 pre-reboot configuration. 683 """ 684 # check to see if any partitions have changed 685 partition_list = partition_lib.get_partition_list(self, 686 exclude_swap=False) 687 mount_info = partition_lib.get_mount_info(partition_list) 688 old_mount_info = self._state.get('client', 'mount_info') 689 if mount_info != old_mount_info: 690 new_entries = mount_info - old_mount_info 691 old_entries = old_mount_info - mount_info 692 description = ("mounted partitions are different after reboot " 693 "(old entries: %s, new entries: %s)" % 694 (old_entries, new_entries)) 695 self._record_reboot_failure(subdir, "reboot.verify_config", 696 description, running_id=running_id) 697 raise error.JobError("Reboot failed: %s" % description) 698 699 # check to see if any CPUs have changed 700 cpu_count = utils.count_cpus() 701 old_count = self._state.get('client', 'cpu_count') 702 if cpu_count != old_count: 703 description = ('Number of CPUs changed after reboot ' 704 '(old count: %d, new count: %d)' % 705 (old_count, cpu_count)) 706 self._record_reboot_failure(subdir, 'reboot.verify_config', 707 description, running_id=running_id) 708 raise error.JobError('Reboot failed: %s' % description) 709 710 711 def end_reboot(self, subdir, kernel, patches, running_id=None): 712 self._check_post_reboot(subdir, running_id=running_id) 713 714 # strip ::<timestamp> from the kernel version if present 715 kernel = kernel.split("::")[0] 716 kernel_info = {"kernel": kernel} 717 for i, patch in enumerate(patches): 718 kernel_info["patch%d" % i] = patch 719 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 720 721 722 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 723 type='src', patches=[]): 724 """ Check the passed kernel identifier against the command line 725 and the running kernel, abort the job on missmatch. """ 726 727 logging.info("POST BOOT: checking booted kernel " 728 "mark=%d identity='%s' type='%s'", 729 expected_when, expected_id, type) 730 731 running_id = utils.running_os_ident() 732 733 cmdline = utils.read_one_line("/proc/cmdline") 734 735 find_sum = re.compile(r'.*IDENT=(\d+)') 736 m = find_sum.match(cmdline) 737 cmdline_when = -1 738 if m: 739 cmdline_when = int(m.groups()[0]) 740 741 # We have all the facts, see if they indicate we 742 # booted the requested kernel or not. 743 bad = False 744 if (type == 'src' and expected_id != running_id or 745 type == 'rpm' and 746 not running_id.startswith(expected_id + '::')): 747 logging.error("Kernel identifier mismatch") 748 bad = True 749 if expected_when != cmdline_when: 750 logging.error("Kernel command line mismatch") 751 bad = True 752 753 if bad: 754 logging.error(" Expected Ident: " + expected_id) 755 logging.error(" Running Ident: " + running_id) 756 logging.error(" Expected Mark: %d", expected_when) 757 logging.error("Command Line Mark: %d", cmdline_when) 758 logging.error(" Command Line: " + cmdline) 759 760 self._record_reboot_failure(subdir, "reboot.verify", "boot failure", 761 running_id=running_id) 762 raise error.JobError("Reboot returned with the wrong kernel") 763 764 self.record('GOOD', subdir, 'reboot.verify', 765 utils.running_os_full_version()) 766 self.end_reboot(subdir, expected_id, patches, running_id=running_id) 767 768 769 def partition(self, device, loop_size=0, mountpoint=None): 770 """ 771 Work with a machine partition 772 773 @param device: e.g. /dev/sda2, /dev/sdb1 etc... 774 @param mountpoint: Specify a directory to mount to. If not specified 775 autotest tmp directory will be used. 776 @param loop_size: Size of loopback device (in MB). Defaults to 0. 777 778 @return: A L{client.bin.partition.partition} object 779 """ 780 781 if not mountpoint: 782 mountpoint = self.tmpdir 783 return partition_lib.partition(self, device, loop_size, mountpoint) 784 785 @utils.deprecated 786 def filesystem(self, device, mountpoint=None, loop_size=0): 787 """ Same as partition 788 789 @deprecated: Use partition method instead 790 """ 791 return self.partition(device, loop_size, mountpoint) 792 793 794 def enable_external_logging(self): 795 pass 796 797 798 def disable_external_logging(self): 799 pass 800 801 802 def reboot_setup(self): 803 # save the partition list and mount points, as well as the cpu count 804 partition_list = partition_lib.get_partition_list(self, 805 exclude_swap=False) 806 mount_info = partition_lib.get_mount_info(partition_list) 807 self._state.set('client', 'mount_info', mount_info) 808 self._state.set('client', 'cpu_count', utils.count_cpus()) 809 810 811 def reboot(self, tag=LAST_BOOT_TAG): 812 if tag == LAST_BOOT_TAG: 813 tag = self.last_boot_tag 814 else: 815 self.last_boot_tag = tag 816 817 self.reboot_setup() 818 self.harness.run_reboot() 819 default = self.config_get('boot.set_default') 820 if default: 821 self.bootloader.set_default(tag) 822 else: 823 self.bootloader.boot_once(tag) 824 825 # HACK: using this as a module sometimes hangs shutdown, so if it's 826 # installed unload it first 827 utils.system("modprobe -r netconsole", ignore_status=True) 828 829 # sync first, so that a sync during shutdown doesn't time out 830 utils.system("sync; sync", ignore_status=True) 831 832 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 833 self.quit() 834 835 836 def noop(self, text): 837 logging.info("job: noop: " + text) 838 839 840 @_run_test_complete_on_exit 841 def parallel(self, *tasklist): 842 """Run tasks in parallel""" 843 844 pids = [] 845 old_log_filename = self._logger.global_filename 846 for i, task in enumerate(tasklist): 847 assert isinstance(task, (tuple, list)) 848 self._logger.global_filename = old_log_filename + (".%d" % i) 849 def task_func(): 850 # stub out _record_indent with a process-local one 851 base_record_indent = self._record_indent 852 proc_local = self._job_state.property_factory( 853 '_state', '_record_indent.%d' % os.getpid(), 854 base_record_indent, namespace='client') 855 self.__class__._record_indent = proc_local 856 task[0](*task[1:]) 857 pids.append(parallel.fork_start(self.resultdir, task_func)) 858 859 old_log_path = os.path.join(self.resultdir, old_log_filename) 860 old_log = open(old_log_path, "a") 861 exceptions = [] 862 for i, pid in enumerate(pids): 863 # wait for the task to finish 864 try: 865 parallel.fork_waitfor(self.resultdir, pid) 866 except Exception, e: 867 exceptions.append(e) 868 # copy the logs from the subtask into the main log 869 new_log_path = old_log_path + (".%d" % i) 870 if os.path.exists(new_log_path): 871 new_log = open(new_log_path) 872 old_log.write(new_log.read()) 873 new_log.close() 874 old_log.flush() 875 os.remove(new_log_path) 876 old_log.close() 877 878 self._logger.global_filename = old_log_filename 879 880 # handle any exceptions raised by the parallel tasks 881 if exceptions: 882 msg = "%d task(s) failed in job.parallel" % len(exceptions) 883 raise error.JobError(msg) 884 885 886 def quit(self): 887 # XXX: should have a better name. 888 self.harness.run_pause() 889 raise error.JobContinue("more to come") 890 891 892 def complete(self, status): 893 """Write pending TAP reports, clean up, and exit""" 894 # write out TAP reports 895 if self._tap.do_tap_report: 896 self._tap.write() 897 self._tap._write_tap_archive() 898 899 # We are about to exit 'complete' so clean up the control file. 900 dest = os.path.join(self.resultdir, os.path.basename(self._state_file)) 901 shutil.move(self._state_file, dest) 902 903 self.harness.run_complete() 904 self.disable_external_logging() 905 sys.exit(status) 906 907 908 def _load_state(self): 909 # grab any initial state and set up $CONTROL.state as the backing file 910 init_state_file = self.control + '.init.state' 911 self._state_file = self.control + '.state' 912 if os.path.exists(init_state_file): 913 shutil.move(init_state_file, self._state_file) 914 self._state.set_backing_file(self._state_file) 915 916 # initialize the state engine, if necessary 917 has_steps = self._state.has('client', 'steps') 918 if not self._is_continuation and has_steps: 919 raise RuntimeError('Loaded state can only contain client.steps if ' 920 'this is a continuation') 921 922 if not has_steps: 923 logging.info('Initializing the state engine') 924 self._state.set('client', 'steps', []) 925 926 927 def __create_step_tuple(self, fn, args, dargs): 928 # Legacy code passes in an array where the first arg is 929 # the function or its name. 930 if isinstance(fn, list): 931 assert(len(args) == 0) 932 assert(len(dargs) == 0) 933 args = fn[1:] 934 fn = fn[0] 935 # Pickling actual functions is hairy, thus we have to call 936 # them by name. Unfortunately, this means only functions 937 # defined globally can be used as a next step. 938 if callable(fn): 939 fn = fn.__name__ 940 if not isinstance(fn, types.StringTypes): 941 raise StepError("Next steps must be functions or " 942 "strings containing the function name") 943 ancestry = copy.copy(self._current_step_ancestry) 944 return (ancestry, fn, args, dargs) 945 946 947 def next_step_append(self, fn, *args, **dargs): 948 """Define the next step and place it at the end""" 949 steps = self._state.get('client', 'steps') 950 steps.append(self.__create_step_tuple(fn, args, dargs)) 951 self._state.set('client', 'steps', steps) 952 953 954 def next_step(self, fn, *args, **dargs): 955 """Create a new step and place it after any steps added 956 while running the current step but before any steps added in 957 previous steps""" 958 steps = self._state.get('client', 'steps') 959 steps.insert(self._next_step_index, 960 self.__create_step_tuple(fn, args, dargs)) 961 self._next_step_index += 1 962 self._state.set('client', 'steps', steps) 963 964 965 def next_step_prepend(self, fn, *args, **dargs): 966 """Insert a new step, executing first""" 967 steps = self._state.get('client', 'steps') 968 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 969 self._next_step_index += 1 970 self._state.set('client', 'steps', steps) 971 972 973 974 def _run_step_fn(self, local_vars, fn, args, dargs): 975 """Run a (step) function within the given context""" 976 977 local_vars['__args'] = args 978 local_vars['__dargs'] = dargs 979 try: 980 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 981 return local_vars['__ret'] 982 except SystemExit: 983 raise # Send error.JobContinue and JobComplete on up to runjob. 984 except error.TestNAError, detail: 985 self.record(detail.exit_status, None, fn, str(detail)) 986 except Exception, detail: 987 raise error.UnhandledJobError(detail) 988 989 990 def _create_frame(self, global_vars, ancestry, fn_name): 991 """Set up the environment like it would have been when this 992 function was first defined. 993 994 Child step engine 'implementations' must have 'return locals()' 995 at end end of their steps. Because of this, we can call the 996 parent function and get back all child functions (i.e. those 997 defined within it). 998 999 Unfortunately, the call stack of the function calling 1000 job.next_step might have been deeper than the function it 1001 added. In order to make sure that the environment is what it 1002 should be, we need to then pop off the frames we built until 1003 we find the frame where the function was first defined.""" 1004 1005 # The copies ensure that the parent frames are not modified 1006 # while building child frames. This matters if we then 1007 # pop some frames in the next part of this function. 1008 current_frame = copy.copy(global_vars) 1009 frames = [current_frame] 1010 for steps_fn_name in ancestry: 1011 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 1012 current_frame = copy.copy(ret) 1013 frames.append(current_frame) 1014 1015 # Walk up the stack frames until we find the place fn_name was defined. 1016 while len(frames) > 2: 1017 if fn_name not in frames[-2]: 1018 break 1019 if frames[-2][fn_name] != frames[-1][fn_name]: 1020 break 1021 frames.pop() 1022 ancestry.pop() 1023 1024 return (frames[-1], ancestry) 1025 1026 1027 def _add_step_init(self, local_vars, current_function): 1028 """If the function returned a dictionary that includes a 1029 function named 'step_init', prepend it to our list of steps. 1030 This will only get run the first time a function with a nested 1031 use of the step engine is run.""" 1032 1033 if (isinstance(local_vars, dict) and 1034 'step_init' in local_vars and 1035 callable(local_vars['step_init'])): 1036 # The init step is a child of the function 1037 # we were just running. 1038 self._current_step_ancestry.append(current_function) 1039 self.next_step_prepend('step_init') 1040 1041 1042 def step_engine(self): 1043 """The multi-run engine used when the control file defines step_init. 1044 1045 Does the next step. 1046 """ 1047 1048 # Set up the environment and then interpret the control file. 1049 # Some control files will have code outside of functions, 1050 # which means we need to have our state engine initialized 1051 # before reading in the file. 1052 global_control_vars = {'job': self, 1053 'args': self.args} 1054 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 1055 try: 1056 execfile(self.control, global_control_vars, global_control_vars) 1057 except error.TestNAError, detail: 1058 self.record(detail.exit_status, None, self.control, str(detail)) 1059 except SystemExit: 1060 raise # Send error.JobContinue and JobComplete on up to runjob. 1061 except Exception, detail: 1062 # Syntax errors or other general Python exceptions coming out of 1063 # the top level of the control file itself go through here. 1064 raise error.UnhandledJobError(detail) 1065 1066 # If we loaded in a mid-job state file, then we presumably 1067 # know what steps we have yet to run. 1068 if not self._is_continuation: 1069 if 'step_init' in global_control_vars: 1070 self.next_step(global_control_vars['step_init']) 1071 else: 1072 # if last job failed due to unexpected reboot, record it as fail 1073 # so harness gets called 1074 last_job = self._state.get('client', 'unexpected_reboot', None) 1075 if last_job: 1076 subdir, testname = last_job 1077 self.record('FAIL', subdir, testname, 'unexpected reboot') 1078 self.record('END FAIL', subdir, testname) 1079 1080 # Iterate through the steps. If we reboot, we'll simply 1081 # continue iterating on the next step. 1082 while len(self._state.get('client', 'steps')) > 0: 1083 steps = self._state.get('client', 'steps') 1084 (ancestry, fn_name, args, dargs) = steps.pop(0) 1085 self._state.set('client', 'steps', steps) 1086 1087 self._next_step_index = 0 1088 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1089 local_vars, self._current_step_ancestry = ret 1090 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1091 self._add_step_init(local_vars, fn_name) 1092 1093 1094 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1095 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1096 on_every_test) 1097 1098 1099 def add_sysinfo_logfile(self, file, on_every_test=False): 1100 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1101 1102 1103 def _add_sysinfo_loggable(self, loggable, on_every_test): 1104 if on_every_test: 1105 self.sysinfo.test_loggables.add(loggable) 1106 else: 1107 self.sysinfo.boot_loggables.add(loggable) 1108 self._save_sysinfo_state() 1109 1110 1111 def _load_sysinfo_state(self): 1112 state = self._state.get('client', 'sysinfo', None) 1113 if state: 1114 self.sysinfo.deserialize(state) 1115 1116 1117 def _save_sysinfo_state(self): 1118 state = self.sysinfo.serialize() 1119 self._state.set('client', 'sysinfo', state) 1120 1121 1122class disk_usage_monitor: 1123 def __init__(self, logging_func, device, max_mb_per_hour): 1124 self.func = logging_func 1125 self.device = device 1126 self.max_mb_per_hour = max_mb_per_hour 1127 1128 1129 def start(self): 1130 self.initial_space = utils.freespace(self.device) 1131 self.start_time = time.time() 1132 1133 1134 def stop(self): 1135 # if no maximum usage rate was set, we don't need to 1136 # generate any warnings 1137 if not self.max_mb_per_hour: 1138 return 1139 1140 final_space = utils.freespace(self.device) 1141 used_space = self.initial_space - final_space 1142 stop_time = time.time() 1143 total_time = stop_time - self.start_time 1144 # round up the time to one minute, to keep extremely short 1145 # tests from generating false positives due to short, badly 1146 # timed bursts of activity 1147 total_time = max(total_time, 60.0) 1148 1149 # determine the usage rate 1150 bytes_per_sec = used_space / total_time 1151 mb_per_sec = bytes_per_sec / 1024**2 1152 mb_per_hour = mb_per_sec * 60 * 60 1153 1154 if mb_per_hour > self.max_mb_per_hour: 1155 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1156 msg %= (self.device, mb_per_hour) 1157 self.func(msg) 1158 1159 1160 @classmethod 1161 def watch(cls, *monitor_args, **monitor_dargs): 1162 """ Generic decorator to wrap a function call with the 1163 standard create-monitor -> start -> call -> stop idiom.""" 1164 def decorator(func): 1165 def watched_func(*args, **dargs): 1166 monitor = cls(*monitor_args, **monitor_dargs) 1167 monitor.start() 1168 try: 1169 func(*args, **dargs) 1170 finally: 1171 monitor.stop() 1172 return watched_func 1173 return decorator 1174 1175 1176def runjob(control, drop_caches, options): 1177 """ 1178 Run a job using the given control file. 1179 1180 This is the main interface to this module. 1181 1182 @see base_job.__init__ for parameter info. 1183 """ 1184 control = os.path.abspath(control) 1185 state = control + '.state' 1186 # Ensure state file is cleaned up before the job starts to run if autotest 1187 # is not running with the --continue flag 1188 if not options.cont and os.path.isfile(state): 1189 logging.debug('Cleaning up previously found state file') 1190 os.remove(state) 1191 1192 # instantiate the job object ready for the control file. 1193 myjob = None 1194 try: 1195 # Check that the control file is valid 1196 if not os.path.exists(control): 1197 raise error.JobError(control + ": control file not found") 1198 1199 # When continuing, the job is complete when there is no 1200 # state file, ensure we don't try and continue. 1201 if options.cont and not os.path.exists(state): 1202 raise error.JobComplete("all done") 1203 1204 myjob = job(control=control, drop_caches=drop_caches, options=options) 1205 1206 # Load in the users control file, may do any one of: 1207 # 1) execute in toto 1208 # 2) define steps, and select the first via next_step() 1209 myjob.step_engine() 1210 1211 except error.JobContinue: 1212 sys.exit(5) 1213 1214 except error.JobComplete: 1215 sys.exit(1) 1216 1217 except error.JobError, instance: 1218 logging.error("JOB ERROR: " + str(instance)) 1219 if myjob: 1220 command = None 1221 if len(instance.args) > 1: 1222 command = instance.args[1] 1223 myjob.record('ABORT', None, command, str(instance)) 1224 myjob.record('END ABORT', None, None, str(instance)) 1225 assert myjob._record_indent == 0 1226 myjob.complete(1) 1227 else: 1228 sys.exit(1) 1229 1230 except Exception, e: 1231 # NOTE: job._run_step_fn and job.step_engine will turn things into 1232 # a JobError for us. If we get here, its likely an autotest bug. 1233 msg = str(e) + '\n' + traceback.format_exc() 1234 logging.critical("JOB ERROR (autotest bug?): " + msg) 1235 if myjob: 1236 myjob.record('END ABORT', None, None, msg) 1237 assert myjob._record_indent == 0 1238 myjob.complete(1) 1239 else: 1240 sys.exit(1) 1241 1242 # If we get here, then we assume the job is complete and good. 1243 myjob.record('END GOOD', None, None) 1244 assert myjob._record_indent == 0 1245 1246 myjob.complete(0) 1247 1248 1249site_job = utils.import_site_class( 1250 __file__, "autotest_lib.client.bin.site_job", "site_job", base_client_job) 1251 1252class job(site_job): 1253 pass 1254