job.py revision a700772e70548b10721ba709c7ec5194bb6d7597
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types 9import cPickle as pickle 10from autotest_lib.client.bin import utils, parallel, kernel, xen 11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness 12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition 13from autotest_lib.client.common_lib import error, barrier, log 14from autotest_lib.client.common_lib import packages, debug 15 16LAST_BOOT_TAG = object() 17NO_DEFAULT = object() 18JOB_PREAMBLE = """ 19from autotest_lib.client.common_lib.error import * 20from autotest_lib.client.bin.utils import * 21""" 22 23 24class StepError(error.AutotestError): 25 pass 26 27class NotAvailableError(error.AutotestError): 28 pass 29 30 31 32def _run_test_complete_on_exit(f): 33 """Decorator for job methods that automatically calls 34 self.harness.run_test_complete when the method exits, if appropriate.""" 35 def wrapped(self, *args, **dargs): 36 try: 37 return f(self, *args, **dargs) 38 finally: 39 if self.log_filename == self.DEFAULT_LOG_FILENAME: 40 self.harness.run_test_complete() 41 if self.drop_caches: 42 print "Dropping caches" 43 utils.drop_caches() 44 wrapped.__name__ = f.__name__ 45 wrapped.__doc__ = f.__doc__ 46 wrapped.__dict__.update(f.__dict__) 47 return wrapped 48 49 50class base_job(object): 51 """The actual job against which we do everything. 52 53 Properties: 54 autodir 55 The top level autotest directory (/usr/local/autotest). 56 Comes from os.environ['AUTODIR']. 57 bindir 58 <autodir>/bin/ 59 libdir 60 <autodir>/lib/ 61 testdir 62 <autodir>/tests/ 63 configdir 64 <autodir>/config/ 65 site_testdir 66 <autodir>/site_tests/ 67 profdir 68 <autodir>/profilers/ 69 tmpdir 70 <autodir>/tmp/ 71 pkgdir 72 <autodir>/packages/ 73 resultdir 74 <autodir>/results/<jobtag> 75 toolsdir 76 <autodir>/tools/ 77 stdout 78 fd_stack object for stdout 79 stderr 80 fd_stack object for stderr 81 profilers 82 the profilers object for this job 83 harness 84 the server harness object for this job 85 config 86 the job configuration for this job 87 drop_caches_between_iterations 88 drop the pagecache between each iteration 89 """ 90 91 DEFAULT_LOG_FILENAME = "status" 92 93 def __init__(self, control, jobtag, cont, harness_type=None, 94 use_external_logging=False, drop_caches=True): 95 """ 96 Prepare a client side job object. 97 98 Args: 99 control: The control file (pathname of). 100 jobtag: The job tag string (eg "default"). 101 cont: If this is the continuation of this job. 102 harness_type: An alternative server harness. [None] 103 use_external_logging: If true, the enable_external_logging 104 method will be called during construction. [False] 105 drop_caches: If true, utils.drop_caches() is 106 called before and between all tests. [True] 107 """ 108 self.drop_caches = drop_caches 109 if self.drop_caches: 110 print "Dropping caches" 111 utils.drop_caches() 112 self.drop_caches_between_iterations = False 113 self.autodir = os.environ['AUTODIR'] 114 self.bindir = os.path.join(self.autodir, 'bin') 115 self.libdir = os.path.join(self.autodir, 'lib') 116 self.testdir = os.path.join(self.autodir, 'tests') 117 self.configdir = os.path.join(self.autodir, 'config') 118 self.site_testdir = os.path.join(self.autodir, 'site_tests') 119 self.profdir = os.path.join(self.autodir, 'profilers') 120 self.tmpdir = os.path.join(self.autodir, 'tmp') 121 self.toolsdir = os.path.join(self.autodir, 'tools') 122 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 123 124 self.control = os.path.realpath(control) 125 self._is_continuation = cont 126 self.state_file = self.control + '.state' 127 self.current_step_ancestry = [] 128 self.next_step_index = 0 129 self.testtag = '' 130 self._test_tag_prefix = '' 131 132 self._load_state() 133 self.pkgmgr = packages.PackageManager( 134 self.autodir, run_function_dargs={'timeout':1800}) 135 self.pkgdir = os.path.join(self.autodir, 'packages') 136 self.run_test_cleanup = self.get_state("__run_test_cleanup", 137 default=True) 138 139 self.sysinfo = sysinfo.sysinfo(self.resultdir) 140 self._load_sysinfo_state() 141 142 self.last_boot_tag = self.get_state("__last_boot_tag", default=None) 143 self.job_log = debug.get_logger(module='client') 144 145 if not cont: 146 """ 147 Don't cleanup the tmp dir (which contains the lockfile) 148 in the constructor, this would be a problem for multiple 149 jobs starting at the same time on the same client. Instead 150 do the delete at the server side. We simply create the tmp 151 directory here if it does not already exist. 152 """ 153 if not os.path.exists(self.tmpdir): 154 os.mkdir(self.tmpdir) 155 156 if not os.path.exists(self.pkgdir): 157 os.mkdir(self.pkgdir) 158 159 results = os.path.join(self.autodir, 'results') 160 if not os.path.exists(results): 161 os.mkdir(results) 162 163 download = os.path.join(self.testdir, 'download') 164 if not os.path.exists(download): 165 os.mkdir(download) 166 167 if os.path.exists(self.resultdir): 168 utils.system('rm -rf ' + self.resultdir) 169 os.mkdir(self.resultdir) 170 171 os.mkdir(os.path.join(self.resultdir, 'debug')) 172 os.mkdir(os.path.join(self.resultdir, 'analysis')) 173 174 shutil.copyfile(self.control, 175 os.path.join(self.resultdir, 'control')) 176 177 178 self.control = control 179 self.jobtag = jobtag 180 self.log_filename = self.DEFAULT_LOG_FILENAME 181 self.container = None 182 183 self.stdout = fd_stack.fd_stack(1, sys.stdout) 184 self.stderr = fd_stack.fd_stack(2, sys.stderr) 185 186 self._init_group_level() 187 188 self.config = config.config(self) 189 self.harness = harness.select(harness_type, self) 190 self.profilers = profilers.profilers(self) 191 192 try: 193 tool = self.config_get('boottool.executable') 194 self.bootloader = boottool.boottool(tool) 195 except: 196 pass 197 198 self.sysinfo.log_per_reboot_data() 199 200 if not cont: 201 self.record('START', None, None) 202 self._increment_group_level() 203 204 self.harness.run_start() 205 206 if use_external_logging: 207 self.enable_external_logging() 208 209 # load the max disk usage rate - default to no monitoring 210 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 211 212 213 def monitor_disk_usage(self, max_rate): 214 """\ 215 Signal that the job should monitor disk space usage on / 216 and generate a warning if a test uses up disk space at a 217 rate exceeding 'max_rate'. 218 219 Parameters: 220 max_rate - the maximium allowed rate of disk consumption 221 during a test, in MB/hour, or 0 to indicate 222 no limit. 223 """ 224 self.set_state('__monitor_disk', max_rate) 225 self.max_disk_usage_rate = max_rate 226 227 228 def relative_path(self, path): 229 """\ 230 Return a patch relative to the job results directory 231 """ 232 head = len(self.resultdir) + 1 # remove the / inbetween 233 return path[head:] 234 235 236 def control_get(self): 237 return self.control 238 239 240 def control_set(self, control): 241 self.control = os.path.abspath(control) 242 243 244 def harness_select(self, which): 245 self.harness = harness.select(which, self) 246 247 248 def config_set(self, name, value): 249 self.config.set(name, value) 250 251 252 def config_get(self, name): 253 return self.config.get(name) 254 255 256 def setup_dirs(self, results_dir, tmp_dir): 257 if not tmp_dir: 258 tmp_dir = os.path.join(self.tmpdir, 'build') 259 if not os.path.exists(tmp_dir): 260 os.mkdir(tmp_dir) 261 if not os.path.isdir(tmp_dir): 262 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 263 raise ValueError(e_msg) 264 265 # We label the first build "build" and then subsequent ones 266 # as "build.2", "build.3", etc. Whilst this is a little bit 267 # inconsistent, 99.9% of jobs will only have one build 268 # (that's not done as kernbench, sparse, or buildtest), 269 # so it works out much cleaner. One of life's comprimises. 270 if not results_dir: 271 results_dir = os.path.join(self.resultdir, 'build') 272 i = 2 273 while os.path.exists(results_dir): 274 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 275 i += 1 276 if not os.path.exists(results_dir): 277 os.mkdir(results_dir) 278 279 return (results_dir, tmp_dir) 280 281 282 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 283 kjob = None ): 284 """Summon a xen object""" 285 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 286 build_dir = 'xen' 287 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 288 leave, kjob) 289 290 291 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 292 """Summon a kernel object""" 293 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 294 build_dir = 'linux' 295 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 296 build_dir, leave) 297 298 299 def barrier(self, *args, **kwds): 300 """Create a barrier object""" 301 return barrier.barrier(*args, **kwds) 302 303 304 def install_pkg(self, name, pkg_type, install_dir): 305 ''' 306 This method is a simple wrapper around the actual package 307 installation method in the Packager class. This is used 308 internally by the profilers, deps and tests code. 309 name : name of the package (ex: sleeptest, dbench etc.) 310 pkg_type : Type of the package (ex: test, dep etc.) 311 install_dir : The directory in which the source is actually 312 untarred into. (ex: client/profilers/<name> for profilers) 313 ''' 314 if len(self.pkgmgr.repo_urls) > 0: 315 self.pkgmgr.install_pkg(name, pkg_type, 316 self.pkgdir, install_dir) 317 318 319 def add_repository(self, repo_urls): 320 ''' 321 Adds the repository locations to the job so that packages 322 can be fetched from them when needed. The repository list 323 needs to be a string list 324 Ex: job.add_repository(['http://blah1','http://blah2']) 325 ''' 326 # TODO(aganti): Validate the list of the repository URLs. 327 repositories = repo_urls + self.pkgmgr.repo_urls 328 self.pkgmgr = packages.PackageManager( 329 self.autodir, repo_urls=repositories, 330 run_function_dargs={'timeout':1800}) 331 # Fetch the packages' checksum file that contains the checksums 332 # of all the packages if it is not already fetched. The checksum 333 # is always fetched whenever a job is first started. This 334 # is not done in the job's constructor as we don't have the list of 335 # the repositories there (and obviously don't care about this file 336 # if we are not using the repos) 337 try: 338 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 339 packages.CHECKSUM_FILE) 340 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, 341 use_checksum=False) 342 except packages.PackageFetchError, e: 343 # packaging system might not be working in this case 344 # Silently fall back to the normal case 345 pass 346 347 348 def require_gcc(self): 349 """ 350 Test whether gcc is installed on the machine. 351 """ 352 # check if gcc is installed on the system. 353 try: 354 utils.system('which gcc') 355 except error.CmdError, e: 356 raise NotAvailableError('gcc is required by this job and is ' 357 'not available on the system') 358 359 360 def setup_dep(self, deps): 361 """Set up the dependencies for this test. 362 deps is a list of libraries required for this test. 363 """ 364 # Fetch the deps from the repositories and set them up. 365 for dep in deps: 366 dep_dir = os.path.join(self.autodir, 'deps', dep) 367 # Search for the dependency in the repositories if specified, 368 # else check locally. 369 try: 370 self.install_pkg(dep, 'dep', dep_dir) 371 except packages.PackageInstallError: 372 # see if the dep is there locally 373 pass 374 375 # dep_dir might not exist if it is not fetched from the repos 376 if not os.path.exists(dep_dir): 377 raise error.TestError("Dependency %s does not exist" % dep) 378 379 os.chdir(dep_dir) 380 utils.system('./' + dep + '.py') 381 382 383 def _runtest(self, url, tag, args, dargs): 384 try: 385 try: 386 l = lambda : test.runtest(self, url, tag, args, dargs) 387 pid = parallel.fork_start(self.resultdir, l) 388 parallel.fork_waitfor(self.resultdir, pid) 389 except error.TestBaseException: 390 # These are already classified with an error type (exit_status) 391 raise 392 except error.JobError: 393 raise # Caught further up and turned into an ABORT. 394 except Exception, e: 395 # Converts all other exceptions thrown by the test regardless 396 # of phase into a TestError(TestBaseException) subclass that 397 # reports them with their full stack trace. 398 raise error.UnhandledTestError(e) 399 finally: 400 # Reset the logging level to client level 401 debug.configure(module='client') 402 403 404 @_run_test_complete_on_exit 405 def run_test(self, url, *args, **dargs): 406 """Summon a test object and run it. 407 408 tag 409 tag to add to testname 410 url 411 url of the test to run 412 """ 413 414 if not url: 415 raise TypeError("Test name is invalid. " 416 "Switched arguments?") 417 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 418 namelen = len(testname) 419 dargs = dargs.copy() 420 tntag = dargs.pop('tag', None) 421 if tntag: # per-test tag is included in reported test name 422 testname += '.' + str(tntag) 423 run_number = self.get_run_number() 424 if run_number: 425 testname += '._%02d_' % run_number 426 self.set_run_number(run_number + 1) 427 if self._is_kernel_in_test_tag(): 428 testname += '.' + platform.release() 429 if self._test_tag_prefix: 430 testname += '.' + self._test_tag_prefix 431 if self.testtag: # job-level tag is included in reported test name 432 testname += '.' + self.testtag 433 subdir = testname 434 sdtag = dargs.pop('subdir_tag', None) 435 if sdtag: # subdir-only tag is not included in reports 436 subdir = subdir + '.' + str(sdtag) 437 tag = subdir[namelen+1:] # '' if none 438 439 outputdir = os.path.join(self.resultdir, subdir) 440 if os.path.exists(outputdir): 441 msg = ("%s already exists, test <%s> may have" 442 " already run with tag <%s>" 443 % (outputdir, testname, tag) ) 444 raise error.TestError(msg) 445 # NOTE: client/common_lib/test.py runtest() depends directory names 446 # being constructed the same way as in this code. 447 os.mkdir(outputdir) 448 449 container = dargs.pop('container', None) 450 if container: 451 cname = container.get('name', None) 452 if not cname: # get old name 453 cname = container.get('container_name', None) 454 mbytes = container.get('mbytes', None) 455 if not mbytes: # get old name 456 mbytes = container.get('mem', None) 457 cpus = container.get('cpus', None) 458 if not cpus: # get old name 459 cpus = container.get('cpu', None) 460 root = container.get('root', None) 461 network = container.get('network', None) 462 disk = container.get('disk', None) 463 try: 464 self.new_container(mbytes=mbytes, cpus=cpus, root=root, 465 name=cname, network=network, disk=disk) 466 except cpuset.CpusetsNotAvailable, e: 467 self.record("START", subdir, testname) 468 self._increment_group_level() 469 self.record("ERROR", subdir, testname, str(e)) 470 self._decrement_group_level() 471 self.record("END ERROR", subdir, testname) 472 return False 473 # We are running in a container now... 474 475 def log_warning(reason): 476 self.record("WARN", subdir, testname, reason) 477 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 478 def group_func(): 479 try: 480 self._runtest(url, tag, args, dargs) 481 except error.TestBaseException, detail: 482 # The error is already classified, record it properly. 483 self.record(detail.exit_status, subdir, testname, str(detail)) 484 raise 485 else: 486 self.record('GOOD', subdir, testname, 'completed successfully') 487 488 try: 489 self._rungroup(subdir, testname, group_func) 490 if container: 491 self.release_container() 492 return True 493 except error.TestBaseException: 494 return False 495 # Any other exception here will be given to the caller 496 # 497 # NOTE: The only exception possible from the control file here 498 # is error.JobError as _runtest() turns all others into an 499 # UnhandledTestError that is caught above. 500 501 502 def _rungroup(self, subdir, testname, function, *args, **dargs): 503 """\ 504 subdir: 505 name of the group 506 testname: 507 name of the test to run, or support step 508 function: 509 subroutine to run 510 *args: 511 arguments for the function 512 513 Returns the result of the passed in function 514 """ 515 516 try: 517 self.record('START', subdir, testname) 518 self._increment_group_level() 519 result = function(*args, **dargs) 520 self._decrement_group_level() 521 self.record('END GOOD', subdir, testname) 522 return result 523 except error.TestBaseException, e: 524 self._decrement_group_level() 525 self.record('END %s' % e.exit_status, subdir, testname) 526 raise 527 except error.JobError, e: 528 self._decrement_group_level() 529 self.record('END ABORT', subdir, testname) 530 raise 531 except Exception, e: 532 # This should only ever happen due to a bug in the given 533 # function's code. The common case of being called by 534 # run_test() will never reach this. If a control file called 535 # run_group() itself, bugs in its function will be caught 536 # here. 537 self._decrement_group_level() 538 err_msg = str(e) + '\n' + traceback.format_exc() 539 self.record('END ERROR', subdir, testname, err_msg) 540 raise 541 542 543 def run_group(self, function, tag=None, **dargs): 544 """ 545 Run a function nested within a group level. 546 547 function: 548 Callable to run. 549 tag: 550 An optional tag name for the group. If None (default) 551 function.__name__ will be used. 552 **dargs: 553 Named arguments for the function. 554 """ 555 if tag: 556 name = tag 557 else: 558 name = function.__name__ 559 560 try: 561 return self._rungroup(subdir=None, testname=name, 562 function=function, **dargs) 563 except (SystemExit, error.TestBaseException): 564 raise 565 # If there was a different exception, turn it into a TestError. 566 # It will be caught by step_engine or _run_step_fn. 567 except Exception, e: 568 raise error.UnhandledTestError(e) 569 570 571 _RUN_NUMBER_STATE = '__run_number' 572 def get_run_number(self): 573 """Get the run number or 0 if no run number has been set.""" 574 return self.get_state(self._RUN_NUMBER_STATE, default=0) 575 576 577 def set_run_number(self, number): 578 """If the run number is non-zero it will be in the output dir name.""" 579 self.set_state(self._RUN_NUMBER_STATE, number) 580 581 582 _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' 583 def _is_kernel_in_test_tag(self): 584 """Boolean: should the kernel version be included in the test tag.""" 585 return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) 586 587 588 def show_kernel_in_test_tag(self, value=True): 589 """If True, the kernel version at test time will prefix test tags.""" 590 self.set_state(self._KERNEL_IN_TAG_STATE, value) 591 592 593 def set_test_tag(self, tag=''): 594 """Set tag to be added to test name of all following run_test steps.""" 595 self.testtag = tag 596 597 598 def set_test_tag_prefix(self, prefix): 599 """Set a prefix string to prepend to all future run_test steps. 600 601 Args: 602 prefix: A string to prepend to any run_test() step tags separated by 603 a '.'; use the empty string '' to clear it. 604 """ 605 self._test_tag_prefix = prefix 606 607 608 def new_container(self, mbytes=None, cpus=None, root=None, name=None, 609 network=None, disk=None, kswapd_merge=False): 610 if not utils.grep('cpuset', '/proc/filesystems'): 611 print "Containers not enabled by latest reboot" 612 return # containers weren't enabled in this kernel boot 613 pid = os.getpid() 614 if not name: 615 name = 'test%d' % pid # make arbitrary unique name 616 self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid, 617 cpus=cpus, root=root, network=network, 618 disk=disk, kswapd_merge=kswapd_merge) 619 # This job's python shell is now running in the new container 620 # and all forked test processes will inherit that container 621 622 623 def release_container(self): 624 if self.container: 625 self.container = self.container.release() 626 627 628 def cpu_count(self): 629 if self.container: 630 return len(self.container.cpus) 631 return utils.count_cpus() # use total system count 632 633 634 def start_reboot(self): 635 self.record('START', None, 'reboot') 636 self._increment_group_level() 637 self.record('GOOD', None, 'reboot.start') 638 639 640 def end_reboot(self, subdir, kernel, patches): 641 kernel_info = {"kernel": kernel} 642 for i, patch in enumerate(patches): 643 kernel_info["patch%d" % i] = patch 644 self._decrement_group_level() 645 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 646 647 648 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 649 type='src', patches=[]): 650 """ Check the passed kernel identifier against the command line 651 and the running kernel, abort the job on missmatch. """ 652 653 print (("POST BOOT: checking booted kernel " + 654 "mark=%d identity='%s' type='%s'") % 655 (expected_when, expected_id, type)) 656 657 running_id = utils.running_os_ident() 658 659 cmdline = utils.read_one_line("/proc/cmdline") 660 661 find_sum = re.compile(r'.*IDENT=(\d+)') 662 m = find_sum.match(cmdline) 663 cmdline_when = -1 664 if m: 665 cmdline_when = int(m.groups()[0]) 666 667 # We have all the facts, see if they indicate we 668 # booted the requested kernel or not. 669 bad = False 670 if (type == 'src' and expected_id != running_id or 671 type == 'rpm' and 672 not running_id.startswith(expected_id + '::')): 673 print "check_kernel_ident: kernel identifier mismatch" 674 bad = True 675 if expected_when != cmdline_when: 676 print "check_kernel_ident: kernel command line mismatch" 677 bad = True 678 679 if bad: 680 print " Expected Ident: " + expected_id 681 print " Running Ident: " + running_id 682 print " Expected Mark: %d" % (expected_when) 683 print "Command Line Mark: %d" % (cmdline_when) 684 print " Command Line: " + cmdline 685 686 self.record("ABORT", subdir, "reboot.verify", "boot failure") 687 self._decrement_group_level() 688 kernel = {"kernel": running_id.split("::")[0]} 689 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 690 raise error.JobError("reboot returned with the wrong kernel") 691 692 self.record('GOOD', subdir, 'reboot.verify', expected_id) 693 self.end_reboot(subdir, expected_id, patches) 694 695 696 def filesystem(self, device, mountpoint=None, loop_size=0): 697 if not mountpoint: 698 mountpoint = self.tmpdir 699 return partition.partition(self, device, loop_size) 700 701 702 def enable_external_logging(self): 703 pass 704 705 706 def disable_external_logging(self): 707 pass 708 709 710 def enable_test_cleanup(self): 711 """ By default tests run test.cleanup """ 712 self.set_state("__run_test_cleanup", True) 713 self.run_test_cleanup = True 714 715 716 def disable_test_cleanup(self): 717 """ By default tests do not run test.cleanup """ 718 self.set_state("__run_test_cleanup", False) 719 self.run_test_cleanup = False 720 721 722 def default_test_cleanup(self, val): 723 if not self._is_continuation: 724 self.set_state("__run_test_cleanup", val) 725 self.run_test_cleanup = val 726 727 728 def default_boot_tag(self, tag): 729 if not self._is_continuation: 730 self.set_state("__last_boot_tag", tag) 731 self.last_boot_tag = tag 732 733 734 def reboot_setup(self): 735 pass 736 737 738 def reboot(self, tag=LAST_BOOT_TAG): 739 if tag == LAST_BOOT_TAG: 740 tag = self.last_boot_tag 741 else: 742 self.set_state("__last_boot_tag", tag) 743 self.last_boot_tag = tag 744 745 self.reboot_setup() 746 self.harness.run_reboot() 747 default = self.config_get('boot.set_default') 748 if default: 749 self.bootloader.set_default(tag) 750 else: 751 self.bootloader.boot_once(tag) 752 753 # HACK: using this as a module sometimes hangs shutdown, so if it's 754 # installed unload it first 755 utils.system("modprobe -r netconsole", ignore_status=True) 756 757 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 758 utils.system(cmd) 759 self.quit() 760 761 762 def noop(self, text): 763 print "job: noop: " + text 764 765 766 @_run_test_complete_on_exit 767 def parallel(self, *tasklist): 768 """Run tasks in parallel""" 769 770 pids = [] 771 old_log_filename = self.log_filename 772 for i, task in enumerate(tasklist): 773 assert isinstance(task, (tuple, list)) 774 self.log_filename = old_log_filename + (".%d" % i) 775 task_func = lambda: task[0](*task[1:]) 776 pids.append(parallel.fork_start(self.resultdir, task_func)) 777 778 old_log_path = os.path.join(self.resultdir, old_log_filename) 779 old_log = open(old_log_path, "a") 780 exceptions = [] 781 for i, pid in enumerate(pids): 782 # wait for the task to finish 783 try: 784 parallel.fork_waitfor(self.resultdir, pid) 785 except Exception, e: 786 exceptions.append(e) 787 # copy the logs from the subtask into the main log 788 new_log_path = old_log_path + (".%d" % i) 789 if os.path.exists(new_log_path): 790 new_log = open(new_log_path) 791 old_log.write(new_log.read()) 792 new_log.close() 793 old_log.flush() 794 os.remove(new_log_path) 795 old_log.close() 796 797 self.log_filename = old_log_filename 798 799 # handle any exceptions raised by the parallel tasks 800 if exceptions: 801 msg = "%d task(s) failed in job.parallel" % len(exceptions) 802 raise error.JobError(msg) 803 804 805 def quit(self): 806 # XXX: should have a better name. 807 self.harness.run_pause() 808 raise error.JobContinue("more to come") 809 810 811 def complete(self, status): 812 """Clean up and exit""" 813 # We are about to exit 'complete' so clean up the control file. 814 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 815 os.rename(self.state_file, dest) 816 817 self.harness.run_complete() 818 self.disable_external_logging() 819 sys.exit(status) 820 821 822 def set_state(self, var, val): 823 # Deep copies make sure that the state can't be altered 824 # without it being re-written. Perf wise, deep copies 825 # are overshadowed by pickling/loading. 826 self.state[var] = copy.deepcopy(val) 827 outfile = open(self.state_file, 'w') 828 try: 829 pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) 830 finally: 831 outfile.close() 832 print "Persistent state variable %s now set to %r" % (var, val) 833 834 835 def _load_state(self): 836 if hasattr(self, 'state'): 837 raise RuntimeError('state already exists') 838 infile = None 839 try: 840 try: 841 infile = open(self.state_file, 'rb') 842 self.state = pickle.load(infile) 843 except IOError: 844 self.state = {} 845 initialize = True 846 finally: 847 if infile: 848 infile.close() 849 850 initialize = '__steps' not in self.state 851 if not (self._is_continuation or initialize): 852 raise RuntimeError('Loaded state must contain __steps or be a ' 853 'continuation.') 854 855 if initialize: 856 print 'Initializing the state engine.' 857 self.set_state('__steps', []) # writes pickle file 858 859 860 def get_state(self, var, default=NO_DEFAULT): 861 if var in self.state or default == NO_DEFAULT: 862 val = self.state[var] 863 else: 864 val = default 865 return copy.deepcopy(val) 866 867 868 def __create_step_tuple(self, fn, args, dargs): 869 # Legacy code passes in an array where the first arg is 870 # the function or its name. 871 if isinstance(fn, list): 872 assert(len(args) == 0) 873 assert(len(dargs) == 0) 874 args = fn[1:] 875 fn = fn[0] 876 # Pickling actual functions is hairy, thus we have to call 877 # them by name. Unfortunately, this means only functions 878 # defined globally can be used as a next step. 879 if callable(fn): 880 fn = fn.__name__ 881 if not isinstance(fn, types.StringTypes): 882 raise StepError("Next steps must be functions or " 883 "strings containing the function name") 884 ancestry = copy.copy(self.current_step_ancestry) 885 return (ancestry, fn, args, dargs) 886 887 888 def next_step_append(self, fn, *args, **dargs): 889 """Define the next step and place it at the end""" 890 steps = self.get_state('__steps') 891 steps.append(self.__create_step_tuple(fn, args, dargs)) 892 self.set_state('__steps', steps) 893 894 895 def next_step(self, fn, *args, **dargs): 896 """Create a new step and place it after any steps added 897 while running the current step but before any steps added in 898 previous steps""" 899 steps = self.get_state('__steps') 900 steps.insert(self.next_step_index, 901 self.__create_step_tuple(fn, args, dargs)) 902 self.next_step_index += 1 903 self.set_state('__steps', steps) 904 905 906 def next_step_prepend(self, fn, *args, **dargs): 907 """Insert a new step, executing first""" 908 steps = self.get_state('__steps') 909 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 910 self.next_step_index += 1 911 self.set_state('__steps', steps) 912 913 914 def _run_step_fn(self, local_vars, fn, args, dargs): 915 """Run a (step) function within the given context""" 916 917 local_vars['__args'] = args 918 local_vars['__dargs'] = dargs 919 try: 920 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 921 return local_vars['__ret'] 922 except SystemExit: 923 raise # Send error.JobContinue and JobComplete on up to runjob. 924 except error.TestNAError, detail: 925 self.record(detail.exit_status, None, fn, str(detail)) 926 except Exception, detail: 927 raise error.UnhandledJobError(detail) 928 929 930 def _create_frame(self, global_vars, ancestry, fn_name): 931 """Set up the environment like it would have been when this 932 function was first defined. 933 934 Child step engine 'implementations' must have 'return locals()' 935 at end end of their steps. Because of this, we can call the 936 parent function and get back all child functions (i.e. those 937 defined within it). 938 939 Unfortunately, the call stack of the function calling 940 job.next_step might have been deeper than the function it 941 added. In order to make sure that the environment is what it 942 should be, we need to then pop off the frames we built until 943 we find the frame where the function was first defined.""" 944 945 # The copies ensure that the parent frames are not modified 946 # while building child frames. This matters if we then 947 # pop some frames in the next part of this function. 948 current_frame = copy.copy(global_vars) 949 frames = [current_frame] 950 for steps_fn_name in ancestry: 951 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 952 current_frame = copy.copy(ret) 953 frames.append(current_frame) 954 955 # Walk up the stack frames until we find the place fn_name was defined. 956 while len(frames) > 2: 957 if fn_name not in frames[-2]: 958 break 959 if frames[-2][fn_name] != frames[-1][fn_name]: 960 break 961 frames.pop() 962 ancestry.pop() 963 964 return (frames[-1], ancestry) 965 966 967 def _add_step_init(self, local_vars, current_function): 968 """If the function returned a dictionary that includes a 969 function named 'step_init', prepend it to our list of steps. 970 This will only get run the first time a function with a nested 971 use of the step engine is run.""" 972 973 if (isinstance(local_vars, dict) and 974 'step_init' in local_vars and 975 callable(local_vars['step_init'])): 976 # The init step is a child of the function 977 # we were just running. 978 self.current_step_ancestry.append(current_function) 979 self.next_step_prepend('step_init') 980 981 982 def step_engine(self): 983 """The multi-run engine used when the control file defines step_init. 984 985 Does the next step. 986 """ 987 988 # Set up the environment and then interpret the control file. 989 # Some control files will have code outside of functions, 990 # which means we need to have our state engine initialized 991 # before reading in the file. 992 global_control_vars = {'job': self} 993 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 994 try: 995 execfile(self.control, global_control_vars, global_control_vars) 996 except error.TestNAError, detail: 997 self.record(detail.exit_status, None, self.control, str(detail)) 998 except SystemExit: 999 raise # Send error.JobContinue and JobComplete on up to runjob. 1000 except Exception, detail: 1001 # Syntax errors or other general Python exceptions coming out of 1002 # the top level of the control file itself go through here. 1003 raise error.UnhandledJobError(detail) 1004 1005 # If we loaded in a mid-job state file, then we presumably 1006 # know what steps we have yet to run. 1007 if not self._is_continuation: 1008 if 'step_init' in global_control_vars: 1009 self.next_step(global_control_vars['step_init']) 1010 1011 # Iterate through the steps. If we reboot, we'll simply 1012 # continue iterating on the next step. 1013 while len(self.get_state('__steps')) > 0: 1014 steps = self.get_state('__steps') 1015 (ancestry, fn_name, args, dargs) = steps.pop(0) 1016 self.set_state('__steps', steps) 1017 1018 self.next_step_index = 0 1019 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1020 local_vars, self.current_step_ancestry = ret 1021 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1022 self._add_step_init(local_vars, fn_name) 1023 1024 1025 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1026 self._add_sysinfo_loggable(sysinfo.command(command, logfile), 1027 on_every_test) 1028 1029 1030 def add_sysinfo_logfile(self, file, on_every_test=False): 1031 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1032 1033 1034 def _add_sysinfo_loggable(self, loggable, on_every_test): 1035 if on_every_test: 1036 self.sysinfo.test_loggables.add(loggable) 1037 else: 1038 self.sysinfo.boot_loggables.add(loggable) 1039 self._save_sysinfo_state() 1040 1041 1042 def _load_sysinfo_state(self): 1043 state = self.get_state("__sysinfo", None) 1044 if state: 1045 self.sysinfo.deserialize(state) 1046 1047 1048 def _save_sysinfo_state(self): 1049 state = self.sysinfo.serialize() 1050 self.set_state("__sysinfo", state) 1051 1052 1053 def _init_group_level(self): 1054 self.group_level = self.get_state("__group_level", default=0) 1055 1056 1057 def _increment_group_level(self): 1058 self.group_level += 1 1059 self.set_state("__group_level", self.group_level) 1060 1061 1062 def _decrement_group_level(self): 1063 self.group_level -= 1 1064 self.set_state("__group_level", self.group_level) 1065 1066 1067 def record(self, status_code, subdir, operation, status = '', 1068 optional_fields=None): 1069 """ 1070 Record job-level status 1071 1072 The intent is to make this file both machine parseable and 1073 human readable. That involves a little more complexity, but 1074 really isn't all that bad ;-) 1075 1076 Format is <status code>\t<subdir>\t<operation>\t<status> 1077 1078 status code: (GOOD|WARN|FAIL|ABORT) 1079 or START 1080 or END (GOOD|WARN|FAIL|ABORT) 1081 1082 subdir: MUST be a relevant subdirectory in the results, 1083 or None, which will be represented as '----' 1084 1085 operation: description of what you ran (e.g. "dbench", or 1086 "mkfs -t foobar /dev/sda9") 1087 1088 status: error message or "completed sucessfully" 1089 1090 ------------------------------------------------------------ 1091 1092 Initial tabs indicate indent levels for grouping, and is 1093 governed by self.group_level 1094 1095 multiline messages have secondary lines prefaced by a double 1096 space (' ') 1097 """ 1098 1099 if subdir: 1100 if re.match(r'[\n\t]', subdir): 1101 raise ValueError("Invalid character in subdir string") 1102 substr = subdir 1103 else: 1104 substr = '----' 1105 1106 if not log.is_valid_status(status_code): 1107 raise ValueError("Invalid status code supplied: %s" % status_code) 1108 if not operation: 1109 operation = '----' 1110 1111 if re.match(r'[\n\t]', operation): 1112 raise ValueError("Invalid character in operation string") 1113 operation = operation.rstrip() 1114 1115 if not optional_fields: 1116 optional_fields = {} 1117 1118 status = status.rstrip() 1119 status = re.sub(r"\t", " ", status) 1120 # Ensure any continuation lines are marked so we can 1121 # detect them in the status file to ensure it is parsable. 1122 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 1123 1124 # Generate timestamps for inclusion in the logs 1125 epoch_time = int(time.time()) # seconds since epoch, in UTC 1126 local_time = time.localtime(epoch_time) 1127 optional_fields["timestamp"] = str(epoch_time) 1128 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1129 local_time) 1130 1131 fields = [status_code, substr, operation] 1132 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1133 fields.append(status) 1134 1135 msg = '\t'.join(str(x) for x in fields) 1136 msg = '\t' * self.group_level + msg 1137 1138 msg_tag = "" 1139 if "." in self.log_filename: 1140 msg_tag = self.log_filename.split(".", 1)[1] 1141 1142 self.harness.test_status_detail(status_code, substr, operation, status, 1143 msg_tag) 1144 self.harness.test_status(msg, msg_tag) 1145 1146 # log to stdout (if enabled) 1147 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 1148 print msg 1149 1150 # log to the "root" status log 1151 status_file = os.path.join(self.resultdir, self.log_filename) 1152 open(status_file, "a").write(msg + "\n") 1153 1154 # log to the subdir status log (if subdir is set) 1155 if subdir: 1156 dir = os.path.join(self.resultdir, subdir) 1157 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1158 open(status_file, "a").write(msg + "\n") 1159 1160 1161class disk_usage_monitor: 1162 def __init__(self, logging_func, device, max_mb_per_hour): 1163 self.func = logging_func 1164 self.device = device 1165 self.max_mb_per_hour = max_mb_per_hour 1166 1167 1168 def start(self): 1169 self.initial_space = utils.freespace(self.device) 1170 self.start_time = time.time() 1171 1172 1173 def stop(self): 1174 # if no maximum usage rate was set, we don't need to 1175 # generate any warnings 1176 if not self.max_mb_per_hour: 1177 return 1178 1179 final_space = utils.freespace(self.device) 1180 used_space = self.initial_space - final_space 1181 stop_time = time.time() 1182 total_time = stop_time - self.start_time 1183 # round up the time to one minute, to keep extremely short 1184 # tests from generating false positives due to short, badly 1185 # timed bursts of activity 1186 total_time = max(total_time, 60.0) 1187 1188 # determine the usage rate 1189 bytes_per_sec = used_space / total_time 1190 mb_per_sec = bytes_per_sec / 1024**2 1191 mb_per_hour = mb_per_sec * 60 * 60 1192 1193 if mb_per_hour > self.max_mb_per_hour: 1194 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1195 msg %= (self.device, mb_per_hour) 1196 self.func(msg) 1197 1198 1199 @classmethod 1200 def watch(cls, *monitor_args, **monitor_dargs): 1201 """ Generic decorator to wrap a function call with the 1202 standard create-monitor -> start -> call -> stop idiom.""" 1203 def decorator(func): 1204 def watched_func(*args, **dargs): 1205 monitor = cls(*monitor_args, **monitor_dargs) 1206 monitor.start() 1207 try: 1208 func(*args, **dargs) 1209 finally: 1210 monitor.stop() 1211 return watched_func 1212 return decorator 1213 1214 1215def runjob(control, cont=False, tag="default", harness_type='', 1216 use_external_logging=False): 1217 """ 1218 Run a job using the given control file. 1219 1220 This is the main interface to this module. 1221 1222 Args: 1223 control: The control file to use for this job. 1224 cont: Whether this is the continuation of a previously started job. 1225 tag: The job tag string. ['default'] 1226 harness_type: An alternative server harness. [None] 1227 use_external_logging: Should external logging be enabled? [False] 1228 """ 1229 control = os.path.abspath(control) 1230 state = control + '.state' 1231 1232 # instantiate the job object ready for the control file. 1233 myjob = None 1234 try: 1235 # Check that the control file is valid 1236 if not os.path.exists(control): 1237 raise error.JobError(control + ": control file not found") 1238 1239 # When continuing, the job is complete when there is no 1240 # state file, ensure we don't try and continue. 1241 if cont and not os.path.exists(state): 1242 raise error.JobComplete("all done") 1243 1244 myjob = job(control, tag, cont, harness_type=harness_type, 1245 use_external_logging=use_external_logging) 1246 1247 # Load in the users control file, may do any one of: 1248 # 1) execute in toto 1249 # 2) define steps, and select the first via next_step() 1250 myjob.step_engine() 1251 1252 except error.JobContinue: 1253 sys.exit(5) 1254 1255 except error.JobComplete: 1256 sys.exit(1) 1257 1258 except error.JobError, instance: 1259 print "JOB ERROR: " + instance.args[0] 1260 if myjob: 1261 command = None 1262 if len(instance.args) > 1: 1263 command = instance.args[1] 1264 myjob.record('ABORT', None, command, instance.args[0]) 1265 myjob._decrement_group_level() 1266 myjob.record('END ABORT', None, None, instance.args[0]) 1267 assert (myjob.group_level == 0), ('myjob.group_level must be 0,' 1268 ' not %d' % myjob.group_level) 1269 myjob.complete(1) 1270 else: 1271 sys.exit(1) 1272 1273 except Exception, e: 1274 # NOTE: job._run_step_fn and job.step_engine will turn things into 1275 # a JobError for us. If we get here, its likely an autotest bug. 1276 msg = str(e) + '\n' + traceback.format_exc() 1277 print "JOB ERROR (autotest bug?): " + msg 1278 if myjob: 1279 myjob._decrement_group_level() 1280 myjob.record('END ABORT', None, None, msg) 1281 assert(myjob.group_level == 0) 1282 myjob.complete(1) 1283 else: 1284 sys.exit(1) 1285 1286 # If we get here, then we assume the job is complete and good. 1287 myjob._decrement_group_level() 1288 myjob.record('END GOOD', None, None) 1289 assert(myjob.group_level == 0) 1290 1291 myjob.complete(0) 1292 1293 1294site_job = utils.import_site_class( 1295 __file__, "autotest_lib.client.bin.site_job", "site_job", base_job) 1296 1297class job(site_job, base_job): 1298 pass 1299