job.py revision 7afc3a678bdf39549f6397a9b8d5d181a628e992
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types 9import cPickle as pickle 10from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen 11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness 12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition 13from autotest_lib.client.common_lib import error, barrier, log, utils 14from autotest_lib.client.common_lib import packages, debug 15 16LAST_BOOT_TAG = object() 17NO_DEFAULT = object() 18JOB_PREAMBLE = """ 19from autotest_lib.client.common_lib.error import * 20from autotest_lib.client.common_lib.utils import * 21from autotest_lib.client.bin.autotest_utils import * 22""" 23 24 25class StepError(error.AutotestError): 26 pass 27 28class NotAvailableError(error.AutotestError): 29 pass 30 31 32 33def _run_test_complete_on_exit(f): 34 """Decorator for job methods that automatically calls 35 self.harness.run_test_complete when the method exits, if appropriate.""" 36 def wrapped(self, *args, **dargs): 37 try: 38 return f(self, *args, **dargs) 39 finally: 40 if self.log_filename == self.DEFAULT_LOG_FILENAME: 41 self.harness.run_test_complete() 42 if self.drop_caches: 43 print "Dropping caches" 44 autotest_utils.drop_caches() 45 wrapped.__name__ = f.__name__ 46 wrapped.__doc__ = f.__doc__ 47 wrapped.__dict__.update(f.__dict__) 48 return wrapped 49 50 51class base_job(object): 52 """The actual job against which we do everything. 53 54 Properties: 55 autodir 56 The top level autotest directory (/usr/local/autotest). 57 Comes from os.environ['AUTODIR']. 58 bindir 59 <autodir>/bin/ 60 libdir 61 <autodir>/lib/ 62 testdir 63 <autodir>/tests/ 64 configdir 65 <autodir>/config/ 66 site_testdir 67 <autodir>/site_tests/ 68 profdir 69 <autodir>/profilers/ 70 tmpdir 71 <autodir>/tmp/ 72 pkgdir 73 <autodir>/packages/ 74 resultdir 75 <autodir>/results/<jobtag> 76 toolsdir 77 <autodir>/tools/ 78 stdout 79 fd_stack object for stdout 80 stderr 81 fd_stack object for stderr 82 profilers 83 the profilers object for this job 84 harness 85 the server harness object for this job 86 config 87 the job configuration for this job 88 """ 89 90 DEFAULT_LOG_FILENAME = "status" 91 92 def __init__(self, control, jobtag, cont, harness_type=None, 93 use_external_logging = False, drop_caches=True): 94 """ 95 control 96 The control file (pathname of) 97 jobtag 98 The job tag string (eg "default") 99 cont 100 If this is the continuation of this job 101 harness_type 102 An alternative server harness 103 """ 104 self.drop_caches = drop_caches 105 if self.drop_caches: 106 print "Dropping caches" 107 autotest_utils.drop_caches() 108 109 self.autodir = os.environ['AUTODIR'] 110 self.bindir = os.path.join(self.autodir, 'bin') 111 self.libdir = os.path.join(self.autodir, 'lib') 112 self.testdir = os.path.join(self.autodir, 'tests') 113 self.configdir = os.path.join(self.autodir, 'config') 114 self.site_testdir = os.path.join(self.autodir, 'site_tests') 115 self.profdir = os.path.join(self.autodir, 'profilers') 116 self.tmpdir = os.path.join(self.autodir, 'tmp') 117 self.toolsdir = os.path.join(self.autodir, 'tools') 118 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 119 120 self.control = os.path.realpath(control) 121 self._is_continuation = cont 122 self.state_file = self.control + '.state' 123 self.current_step_ancestry = [] 124 self.next_step_index = 0 125 self.testtag = '' 126 self._test_tag_prefix = '' 127 128 self._load_state() 129 self.pkgmgr = packages.PackageManager( 130 self.autodir, run_function_dargs={'timeout':1800}) 131 self.pkgdir = os.path.join(self.autodir, 'packages') 132 self.run_test_cleanup = self.get_state("__run_test_cleanup", 133 default=True) 134 135 self.sysinfo = sysinfo.sysinfo(self.resultdir) 136 self._load_sysinfo_state() 137 138 self.last_boot_tag = self.get_state("__last_boot_tag", default=None) 139 self.job_log = debug.get_logger(module='client') 140 141 if not cont: 142 """ 143 Don't cleanup the tmp dir (which contains the lockfile) 144 in the constructor, this would be a problem for multiple 145 jobs starting at the same time on the same client. Instead 146 do the delete at the server side. We simply create the tmp 147 directory here if it does not already exist. 148 """ 149 if not os.path.exists(self.tmpdir): 150 os.mkdir(self.tmpdir) 151 152 if not os.path.exists(self.pkgdir): 153 os.mkdir(self.pkgdir) 154 155 results = os.path.join(self.autodir, 'results') 156 if not os.path.exists(results): 157 os.mkdir(results) 158 159 download = os.path.join(self.testdir, 'download') 160 if not os.path.exists(download): 161 os.mkdir(download) 162 163 if os.path.exists(self.resultdir): 164 utils.system('rm -rf ' + self.resultdir) 165 os.mkdir(self.resultdir) 166 167 os.mkdir(os.path.join(self.resultdir, 'debug')) 168 os.mkdir(os.path.join(self.resultdir, 'analysis')) 169 170 shutil.copyfile(self.control, 171 os.path.join(self.resultdir, 'control')) 172 173 174 self.control = control 175 self.jobtag = jobtag 176 self.log_filename = self.DEFAULT_LOG_FILENAME 177 self.container = None 178 179 self.stdout = fd_stack.fd_stack(1, sys.stdout) 180 self.stderr = fd_stack.fd_stack(2, sys.stderr) 181 182 self._init_group_level() 183 184 self.config = config.config(self) 185 self.harness = harness.select(harness_type, self) 186 self.profilers = profilers.profilers(self) 187 188 try: 189 tool = self.config_get('boottool.executable') 190 self.bootloader = boottool.boottool(tool) 191 except: 192 pass 193 194 self.sysinfo.log_per_reboot_data() 195 196 if not cont: 197 self.record('START', None, None) 198 self._increment_group_level() 199 200 self.harness.run_start() 201 202 if use_external_logging: 203 self.enable_external_logging() 204 205 # load the max disk usage rate - default to no monitoring 206 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 207 208 209 def monitor_disk_usage(self, max_rate): 210 """\ 211 Signal that the job should monitor disk space usage on / 212 and generate a warning if a test uses up disk space at a 213 rate exceeding 'max_rate'. 214 215 Parameters: 216 max_rate - the maximium allowed rate of disk consumption 217 during a test, in MB/hour, or 0 to indicate 218 no limit. 219 """ 220 self.set_state('__monitor_disk', max_rate) 221 self.max_disk_usage_rate = max_rate 222 223 224 def relative_path(self, path): 225 """\ 226 Return a patch relative to the job results directory 227 """ 228 head = len(self.resultdir) + 1 # remove the / inbetween 229 return path[head:] 230 231 232 def control_get(self): 233 return self.control 234 235 236 def control_set(self, control): 237 self.control = os.path.abspath(control) 238 239 240 def harness_select(self, which): 241 self.harness = harness.select(which, self) 242 243 244 def config_set(self, name, value): 245 self.config.set(name, value) 246 247 248 def config_get(self, name): 249 return self.config.get(name) 250 251 252 def setup_dirs(self, results_dir, tmp_dir): 253 if not tmp_dir: 254 tmp_dir = os.path.join(self.tmpdir, 'build') 255 if not os.path.exists(tmp_dir): 256 os.mkdir(tmp_dir) 257 if not os.path.isdir(tmp_dir): 258 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 259 raise ValueError(e_msg) 260 261 # We label the first build "build" and then subsequent ones 262 # as "build.2", "build.3", etc. Whilst this is a little bit 263 # inconsistent, 99.9% of jobs will only have one build 264 # (that's not done as kernbench, sparse, or buildtest), 265 # so it works out much cleaner. One of life's comprimises. 266 if not results_dir: 267 results_dir = os.path.join(self.resultdir, 'build') 268 i = 2 269 while os.path.exists(results_dir): 270 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 271 i += 1 272 if not os.path.exists(results_dir): 273 os.mkdir(results_dir) 274 275 return (results_dir, tmp_dir) 276 277 278 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 279 kjob = None ): 280 """Summon a xen object""" 281 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 282 build_dir = 'xen' 283 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 284 leave, kjob) 285 286 287 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 288 """Summon a kernel object""" 289 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 290 build_dir = 'linux' 291 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 292 build_dir, leave) 293 294 295 def barrier(self, *args, **kwds): 296 """Create a barrier object""" 297 return barrier.barrier(*args, **kwds) 298 299 300 def install_pkg(self, name, pkg_type, install_dir): 301 ''' 302 This method is a simple wrapper around the actual package 303 installation method in the Packager class. This is used 304 internally by the profilers, deps and tests code. 305 name : name of the package (ex: sleeptest, dbench etc.) 306 pkg_type : Type of the package (ex: test, dep etc.) 307 install_dir : The directory in which the source is actually 308 untarred into. (ex: client/profilers/<name> for profilers) 309 ''' 310 if len(self.pkgmgr.repo_urls) > 0: 311 self.pkgmgr.install_pkg(name, pkg_type, 312 self.pkgdir, install_dir) 313 314 315 def add_repository(self, repo_urls): 316 ''' 317 Adds the repository locations to the job so that packages 318 can be fetched from them when needed. The repository list 319 needs to be a string list 320 Ex: job.add_repository(['http://blah1','http://blah2']) 321 ''' 322 # TODO(aganti): Validate the list of the repository URLs. 323 repositories = repo_urls + self.pkgmgr.repo_urls 324 self.pkgmgr = packages.PackageManager( 325 self.autodir, repo_urls=repositories, 326 run_function_dargs={'timeout':1800}) 327 # Fetch the packages' checksum file that contains the checksums 328 # of all the packages if it is not already fetched. The checksum 329 # is always fetched whenever a job is first started. This 330 # is not done in the job's constructor as we don't have the list of 331 # the repositories there (and obviously don't care about this file 332 # if we are not using the repos) 333 try: 334 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 335 packages.CHECKSUM_FILE) 336 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, 337 use_checksum=False) 338 except packages.PackageFetchError, e: 339 # packaging system might not be working in this case 340 # Silently fall back to the normal case 341 pass 342 343 344 def require_gcc(self): 345 """ 346 Test whether gcc is installed on the machine. 347 """ 348 # check if gcc is installed on the system. 349 try: 350 utils.system('which gcc') 351 except error.CmdError, e: 352 raise NotAvailableError('gcc is required by this job and is ' 353 'not available on the system') 354 355 356 def setup_dep(self, deps): 357 """Set up the dependencies for this test. 358 deps is a list of libraries required for this test. 359 """ 360 # Fetch the deps from the repositories and set them up. 361 for dep in deps: 362 dep_dir = os.path.join(self.autodir, 'deps', dep) 363 # Search for the dependency in the repositories if specified, 364 # else check locally. 365 try: 366 self.install_pkg(dep, 'dep', dep_dir) 367 except packages.PackageInstallError: 368 # see if the dep is there locally 369 pass 370 371 # dep_dir might not exist if it is not fetched from the repos 372 if not os.path.exists(dep_dir): 373 raise error.TestError("Dependency %s does not exist" % dep) 374 375 os.chdir(dep_dir) 376 utils.system('./' + dep + '.py') 377 378 379 def _runtest(self, url, tag, args, dargs): 380 try: 381 try: 382 l = lambda : test.runtest(self, url, tag, args, dargs) 383 pid = parallel.fork_start(self.resultdir, l) 384 parallel.fork_waitfor(self.resultdir, pid) 385 except error.TestBaseException: 386 raise 387 except Exception, e: 388 raise error.UnhandledTestError(e) 389 finally: 390 # Reset the logging level to client level 391 debug.configure(module='client') 392 393 394 @_run_test_complete_on_exit 395 def run_test(self, url, *args, **dargs): 396 """Summon a test object and run it. 397 398 tag 399 tag to add to testname 400 url 401 url of the test to run 402 """ 403 404 if not url: 405 raise TypeError("Test name is invalid. " 406 "Switched arguments?") 407 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 408 namelen = len(testname) 409 dargs = dargs.copy() 410 tntag = dargs.pop('tag', None) 411 if tntag: # per-test tag is included in reported test name 412 testname += '.' + str(tntag) 413 run_number = self.get_run_number() 414 if run_number: 415 testname += '._%02d_' % run_number 416 self.set_run_number(run_number + 1) 417 if self._is_kernel_in_test_tag(): 418 testname += '.' + platform.release() 419 if self._test_tag_prefix: 420 testname += '.' + self._test_tag_prefix 421 if self.testtag: # job-level tag is included in reported test name 422 testname += '.' + self.testtag 423 subdir = testname 424 sdtag = dargs.pop('subdir_tag', None) 425 if sdtag: # subdir-only tag is not included in reports 426 subdir = subdir + '.' + str(sdtag) 427 tag = subdir[namelen+1:] # '' if none 428 429 outputdir = os.path.join(self.resultdir, subdir) 430 if os.path.exists(outputdir): 431 msg = ("%s already exists, test <%s> may have" 432 " already run with tag <%s>" 433 % (outputdir, testname, tag) ) 434 raise error.TestError(msg) 435 # NOTE: client/common_lib/test.py runtest() depends directory names 436 # being constructed the same way as in this code. 437 os.mkdir(outputdir) 438 439 container = dargs.pop('container', None) 440 if container: 441 cname = container.get('name', None) 442 if not cname: # get old name 443 cname = container.get('container_name', None) 444 mbytes = container.get('mbytes', None) 445 if not mbytes: # get old name 446 mbytes = container.get('mem', None) 447 cpus = container.get('cpus', None) 448 if not cpus: # get old name 449 cpus = container.get('cpu', None) 450 root = container.get('root', None) 451 network = container.get('network', None) 452 disk = container.get('disk', None) 453 try: 454 self.new_container(mbytes=mbytes, cpus=cpus, root=root, 455 name=cname, network=network, disk=disk) 456 except cpuset.CpusetsNotAvailable, e: 457 self.record("START", subdir, testname) 458 self._increment_group_level() 459 self.record("ERROR", subdir, testname, str(e)) 460 self._decrement_group_level() 461 self.record("END ERROR", subdir, testname) 462 return False 463 # We are running in a container now... 464 465 def log_warning(reason): 466 self.record("WARN", subdir, testname, reason) 467 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 468 def group_func(): 469 try: 470 self._runtest(url, tag, args, dargs) 471 except error.TestBaseException, detail: 472 self.record(detail.exit_status, subdir, testname, str(detail)) 473 raise 474 except Exception, detail: 475 self.record('FAIL', subdir, testname, str(detail)) 476 raise 477 else: 478 self.record('GOOD', subdir, testname, 'completed successfully') 479 480 try: 481 self._rungroup(subdir, testname, group_func) 482 if container: 483 self.release_container() 484 return True 485 except error.TestBaseException, e: 486 return False 487 # Any other exception here will be given to the caller 488 489 490 def _rungroup(self, subdir, testname, function, *args, **dargs): 491 """\ 492 subdir: 493 name of the group 494 testname: 495 name of the test to run, or support step 496 function: 497 subroutine to run 498 *args: 499 arguments for the function 500 501 Returns the result of the passed in function 502 """ 503 504 try: 505 self.record('START', subdir, testname) 506 self._increment_group_level() 507 result = function(*args, **dargs) 508 self._decrement_group_level() 509 self.record('END GOOD', subdir, testname) 510 return result 511 except error.TestBaseException, e: 512 self._decrement_group_level() 513 self.record('END %s' % e.exit_status, subdir, testname, str(e)) 514 raise 515 except Exception, e: 516 self._decrement_group_level() 517 err_msg = str(e) + '\n' + traceback.format_exc() 518 self.record('END FAIL', subdir, testname, err_msg) 519 raise 520 521 522 def run_group(self, function, **dargs): 523 """\ 524 function: 525 subroutine to run 526 **dargs: 527 arguments for the function 528 """ 529 530 # Allow the tag for the group to be specified 531 tag = dargs.pop('tag', None) 532 if tag: 533 name = tag 534 else: 535 name = function.__name__ 536 537 try: 538 return self._rungroup(subdir=None, testname=name, 539 function=function, **dargs) 540 except error.TestError: 541 pass 542 # if there was a non-TestError exception, raise it 543 except Exception, e: 544 exc_info = sys.exc_info() 545 err = ''.join(traceback.format_exception(*exc_info)) 546 raise error.TestError(name + ' failed\n' + err) 547 548 549 _RUN_NUMBER_STATE = '__run_number' 550 def get_run_number(self): 551 """Get the run number or 0 if no run number has been set.""" 552 return self.get_state(self._RUN_NUMBER_STATE, default=0) 553 554 555 def set_run_number(self, number): 556 """If the run number is non-zero it will be in the output dir name.""" 557 self.set_state(self._RUN_NUMBER_STATE, number) 558 559 560 _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' 561 def _is_kernel_in_test_tag(self): 562 """Boolean: should the kernel version be included in the test tag.""" 563 return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) 564 565 566 def show_kernel_in_test_tag(self, value=True): 567 """If True, the kernel version at test time will prefix test tags.""" 568 self.set_state(self._KERNEL_IN_TAG_STATE, value) 569 570 571 def set_test_tag(self, tag=''): 572 """Set tag to be added to test name of all following run_test steps.""" 573 self.testtag = tag 574 575 576 def set_test_tag_prefix(self, prefix): 577 """Set a prefix string to prepend to all future run_test steps. 578 579 Args: 580 prefix: A string to prepend to any run_test() step tags separated by 581 a '.'; use the empty string '' to clear it. 582 """ 583 self._test_tag_prefix = prefix 584 585 586 def new_container(self, mbytes=None, cpus=None, root=None, name=None, 587 network=None, disk=None, kswapd_merge=False): 588 if not autotest_utils.grep('cpuset', '/proc/filesystems'): 589 print "Containers not enabled by latest reboot" 590 return # containers weren't enabled in this kernel boot 591 pid = os.getpid() 592 if not name: 593 name = 'test%d' % pid # make arbitrary unique name 594 self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid, 595 cpus=cpus, root=root, network=network, 596 disk=disk, kswapd_merge=kswapd_merge) 597 # This job's python shell is now running in the new container 598 # and all forked test processes will inherit that container 599 600 601 def release_container(self): 602 if self.container: 603 self.container = self.container.release() 604 605 606 def cpu_count(self): 607 if self.container: 608 return len(self.container.cpus) 609 return autotest_utils.count_cpus() # use total system count 610 611 612 def start_reboot(self): 613 self.record('START', None, 'reboot') 614 self._increment_group_level() 615 self.record('GOOD', None, 'reboot.start') 616 617 618 def end_reboot(self, subdir, kernel, patches): 619 kernel_info = {"kernel": kernel} 620 for i, patch in enumerate(patches): 621 kernel_info["patch%d" % i] = patch 622 self._decrement_group_level() 623 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 624 625 626 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 627 type='src', patches=[]): 628 """ Check the passed kernel identifier against the command line 629 and the running kernel, abort the job on missmatch. """ 630 631 print (("POST BOOT: checking booted kernel " + 632 "mark=%d identity='%s' type='%s'") % 633 (expected_when, expected_id, type)) 634 635 running_id = autotest_utils.running_os_ident() 636 637 cmdline = utils.read_one_line("/proc/cmdline") 638 639 find_sum = re.compile(r'.*IDENT=(\d+)') 640 m = find_sum.match(cmdline) 641 cmdline_when = -1 642 if m: 643 cmdline_when = int(m.groups()[0]) 644 645 # We have all the facts, see if they indicate we 646 # booted the requested kernel or not. 647 bad = False 648 if (type == 'src' and expected_id != running_id or 649 type == 'rpm' and 650 not running_id.startswith(expected_id + '::')): 651 print "check_kernel_ident: kernel identifier mismatch" 652 bad = True 653 if expected_when != cmdline_when: 654 print "check_kernel_ident: kernel command line mismatch" 655 bad = True 656 657 if bad: 658 print " Expected Ident: " + expected_id 659 print " Running Ident: " + running_id 660 print " Expected Mark: %d" % (expected_when) 661 print "Command Line Mark: %d" % (cmdline_when) 662 print " Command Line: " + cmdline 663 664 self.record("ABORT", subdir, "reboot.verify", "boot failure") 665 self._decrement_group_level() 666 kernel = {"kernel": running_id.split("::")[0]} 667 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 668 raise error.JobError("reboot returned with the wrong kernel") 669 670 self.record('GOOD', subdir, 'reboot.verify', expected_id) 671 self.end_reboot(subdir, expected_id, patches) 672 673 674 def filesystem(self, device, mountpoint = None, loop_size = 0): 675 if not mountpoint: 676 mountpoint = self.tmpdir 677 return partition.partition(self, device, mountpoint, loop_size) 678 679 680 def enable_external_logging(self): 681 pass 682 683 684 def disable_external_logging(self): 685 pass 686 687 688 def enable_test_cleanup(self): 689 """ By default tests run test.cleanup """ 690 self.set_state("__run_test_cleanup", True) 691 self.run_test_cleanup = True 692 693 694 def disable_test_cleanup(self): 695 """ By default tests do not run test.cleanup """ 696 self.set_state("__run_test_cleanup", False) 697 self.run_test_cleanup = False 698 699 700 def default_test_cleanup(self, val): 701 if not self._is_continuation: 702 self.set_state("__run_test_cleanup", val) 703 self.run_test_cleanup = val 704 705 706 def default_boot_tag(self, tag): 707 if not self._is_continuation: 708 self.set_state("__last_boot_tag", tag) 709 self.last_boot_tag = tag 710 711 712 def reboot_setup(self): 713 pass 714 715 716 def reboot(self, tag=LAST_BOOT_TAG): 717 if tag == LAST_BOOT_TAG: 718 tag = self.last_boot_tag 719 else: 720 self.set_state("__last_boot_tag", tag) 721 self.last_boot_tag = tag 722 723 self.reboot_setup() 724 self.harness.run_reboot() 725 default = self.config_get('boot.set_default') 726 if default: 727 self.bootloader.set_default(tag) 728 else: 729 self.bootloader.boot_once(tag) 730 731 # HACK: using this as a module sometimes hangs shutdown, so if it's 732 # installed unload it first 733 utils.system("modprobe -r netconsole", ignore_status=True) 734 735 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 736 utils.system(cmd) 737 self.quit() 738 739 740 def noop(self, text): 741 print "job: noop: " + text 742 743 744 @_run_test_complete_on_exit 745 def parallel(self, *tasklist): 746 """Run tasks in parallel""" 747 748 pids = [] 749 old_log_filename = self.log_filename 750 for i, task in enumerate(tasklist): 751 assert isinstance(task, (tuple, list)) 752 self.log_filename = old_log_filename + (".%d" % i) 753 task_func = lambda: task[0](*task[1:]) 754 pids.append(parallel.fork_start(self.resultdir, task_func)) 755 756 old_log_path = os.path.join(self.resultdir, old_log_filename) 757 old_log = open(old_log_path, "a") 758 exceptions = [] 759 for i, pid in enumerate(pids): 760 # wait for the task to finish 761 try: 762 parallel.fork_waitfor(self.resultdir, pid) 763 except Exception, e: 764 exceptions.append(e) 765 # copy the logs from the subtask into the main log 766 new_log_path = old_log_path + (".%d" % i) 767 if os.path.exists(new_log_path): 768 new_log = open(new_log_path) 769 old_log.write(new_log.read()) 770 new_log.close() 771 old_log.flush() 772 os.remove(new_log_path) 773 old_log.close() 774 775 self.log_filename = old_log_filename 776 777 # handle any exceptions raised by the parallel tasks 778 if exceptions: 779 msg = "%d task(s) failed in job.parallel" % len(exceptions) 780 raise error.JobError(msg) 781 782 783 def quit(self): 784 # XXX: should have a better name. 785 self.harness.run_pause() 786 raise error.JobContinue("more to come") 787 788 789 def complete(self, status): 790 """Clean up and exit""" 791 # We are about to exit 'complete' so clean up the control file. 792 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 793 os.rename(self.state_file, dest) 794 795 self.harness.run_complete() 796 self.disable_external_logging() 797 sys.exit(status) 798 799 800 def set_state(self, var, val): 801 # Deep copies make sure that the state can't be altered 802 # without it being re-written. Perf wise, deep copies 803 # are overshadowed by pickling/loading. 804 self.state[var] = copy.deepcopy(val) 805 outfile = open(self.state_file, 'w') 806 try: 807 pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) 808 finally: 809 outfile.close() 810 print "Persistent state variable %s now set to %r" % (var, val) 811 812 813 def _load_state(self): 814 if hasattr(self, 'state'): 815 raise RuntimeError('state already exists') 816 infile = None 817 try: 818 try: 819 infile = open(self.state_file, 'rb') 820 self.state = pickle.load(infile) 821 except IOError: 822 self.state = {} 823 initialize = True 824 finally: 825 if infile: 826 infile.close() 827 828 initialize = '__steps' not in self.state 829 if not (self._is_continuation or initialize): 830 raise RuntimeError('Loaded state must contain __steps or be a ' 831 'continuation.') 832 833 if initialize: 834 print 'Initializing the state engine.' 835 self.set_state('__steps', []) # writes pickle file 836 837 838 def get_state(self, var, default=NO_DEFAULT): 839 if var in self.state or default == NO_DEFAULT: 840 val = self.state[var] 841 else: 842 val = default 843 return copy.deepcopy(val) 844 845 846 def __create_step_tuple(self, fn, args, dargs): 847 # Legacy code passes in an array where the first arg is 848 # the function or its name. 849 if isinstance(fn, list): 850 assert(len(args) == 0) 851 assert(len(dargs) == 0) 852 args = fn[1:] 853 fn = fn[0] 854 # Pickling actual functions is hairy, thus we have to call 855 # them by name. Unfortunately, this means only functions 856 # defined globally can be used as a next step. 857 if callable(fn): 858 fn = fn.__name__ 859 if not isinstance(fn, types.StringTypes): 860 raise StepError("Next steps must be functions or " 861 "strings containing the function name") 862 ancestry = copy.copy(self.current_step_ancestry) 863 return (ancestry, fn, args, dargs) 864 865 866 def next_step_append(self, fn, *args, **dargs): 867 """Define the next step and place it at the end""" 868 steps = self.get_state('__steps') 869 steps.append(self.__create_step_tuple(fn, args, dargs)) 870 self.set_state('__steps', steps) 871 872 873 def next_step(self, fn, *args, **dargs): 874 """Create a new step and place it after any steps added 875 while running the current step but before any steps added in 876 previous steps""" 877 steps = self.get_state('__steps') 878 steps.insert(self.next_step_index, 879 self.__create_step_tuple(fn, args, dargs)) 880 self.next_step_index += 1 881 self.set_state('__steps', steps) 882 883 884 def next_step_prepend(self, fn, *args, **dargs): 885 """Insert a new step, executing first""" 886 steps = self.get_state('__steps') 887 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 888 self.next_step_index += 1 889 self.set_state('__steps', steps) 890 891 892 def _run_step_fn(self, local_vars, fn, args, dargs): 893 """Run a (step) function within the given context""" 894 895 local_vars['__args'] = args 896 local_vars['__dargs'] = dargs 897 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 898 return local_vars['__ret'] 899 900 901 def _create_frame(self, global_vars, ancestry, fn_name): 902 """Set up the environment like it would have been when this 903 function was first defined. 904 905 Child step engine 'implementations' must have 'return locals()' 906 at end end of their steps. Because of this, we can call the 907 parent function and get back all child functions (i.e. those 908 defined within it). 909 910 Unfortunately, the call stack of the function calling 911 job.next_step might have been deeper than the function it 912 added. In order to make sure that the environment is what it 913 should be, we need to then pop off the frames we built until 914 we find the frame where the function was first defined.""" 915 916 # The copies ensure that the parent frames are not modified 917 # while building child frames. This matters if we then 918 # pop some frames in the next part of this function. 919 current_frame = copy.copy(global_vars) 920 frames = [current_frame] 921 for steps_fn_name in ancestry: 922 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 923 current_frame = copy.copy(ret) 924 frames.append(current_frame) 925 926 # Walk up the stack frames until we find the place fn_name was defined. 927 while len(frames) > 2: 928 if fn_name not in frames[-2]: 929 break 930 if frames[-2][fn_name] != frames[-1][fn_name]: 931 break 932 frames.pop() 933 ancestry.pop() 934 935 return (frames[-1], ancestry) 936 937 938 def _add_step_init(self, local_vars, current_function): 939 """If the function returned a dictionary that includes a 940 function named 'step_init', prepend it to our list of steps. 941 This will only get run the first time a function with a nested 942 use of the step engine is run.""" 943 944 if (isinstance(local_vars, dict) and 945 'step_init' in local_vars and 946 callable(local_vars['step_init'])): 947 # The init step is a child of the function 948 # we were just running. 949 self.current_step_ancestry.append(current_function) 950 self.next_step_prepend('step_init') 951 952 953 def step_engine(self): 954 """The multi-run engine used when the control file defines step_init. 955 956 Does the next step. 957 """ 958 959 # Set up the environment and then interpret the control file. 960 # Some control files will have code outside of functions, 961 # which means we need to have our state engine initialized 962 # before reading in the file. 963 global_control_vars = {'job': self} 964 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 965 execfile(self.control, global_control_vars, global_control_vars) 966 967 # If we loaded in a mid-job state file, then we presumably 968 # know what steps we have yet to run. 969 if not self._is_continuation: 970 if 'step_init' in global_control_vars: 971 self.next_step(global_control_vars['step_init']) 972 973 # Iterate through the steps. If we reboot, we'll simply 974 # continue iterating on the next step. 975 while len(self.get_state('__steps')) > 0: 976 steps = self.get_state('__steps') 977 (ancestry, fn_name, args, dargs) = steps.pop(0) 978 self.set_state('__steps', steps) 979 980 self.next_step_index = 0 981 ret = self._create_frame(global_control_vars, ancestry, fn_name) 982 local_vars, self.current_step_ancestry = ret 983 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 984 self._add_step_init(local_vars, fn_name) 985 986 987 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 988 self._add_sysinfo_loggable(sysinfo.command(command, logfile), 989 on_every_test) 990 991 992 def add_sysinfo_logfile(self, file, on_every_test=False): 993 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 994 995 996 def _add_sysinfo_loggable(self, loggable, on_every_test): 997 if on_every_test: 998 self.sysinfo.test_loggables.add(loggable) 999 else: 1000 self.sysinfo.boot_loggables.add(loggable) 1001 self._save_sysinfo_state() 1002 1003 1004 def _load_sysinfo_state(self): 1005 state = self.get_state("__sysinfo", None) 1006 if state: 1007 self.sysinfo.deserialize(state) 1008 1009 1010 def _save_sysinfo_state(self): 1011 state = self.sysinfo.serialize() 1012 self.set_state("__sysinfo", state) 1013 1014 1015 def _init_group_level(self): 1016 self.group_level = self.get_state("__group_level", default=0) 1017 1018 1019 def _increment_group_level(self): 1020 self.group_level += 1 1021 self.set_state("__group_level", self.group_level) 1022 1023 1024 def _decrement_group_level(self): 1025 self.group_level -= 1 1026 self.set_state("__group_level", self.group_level) 1027 1028 1029 def record(self, status_code, subdir, operation, status = '', 1030 optional_fields=None): 1031 """ 1032 Record job-level status 1033 1034 The intent is to make this file both machine parseable and 1035 human readable. That involves a little more complexity, but 1036 really isn't all that bad ;-) 1037 1038 Format is <status code>\t<subdir>\t<operation>\t<status> 1039 1040 status code: (GOOD|WARN|FAIL|ABORT) 1041 or START 1042 or END (GOOD|WARN|FAIL|ABORT) 1043 1044 subdir: MUST be a relevant subdirectory in the results, 1045 or None, which will be represented as '----' 1046 1047 operation: description of what you ran (e.g. "dbench", or 1048 "mkfs -t foobar /dev/sda9") 1049 1050 status: error message or "completed sucessfully" 1051 1052 ------------------------------------------------------------ 1053 1054 Initial tabs indicate indent levels for grouping, and is 1055 governed by self.group_level 1056 1057 multiline messages have secondary lines prefaced by a double 1058 space (' ') 1059 """ 1060 1061 if subdir: 1062 if re.match(r'[\n\t]', subdir): 1063 raise ValueError("Invalid character in subdir string") 1064 substr = subdir 1065 else: 1066 substr = '----' 1067 1068 if not log.is_valid_status(status_code): 1069 raise ValueError("Invalid status code supplied: %s" % status_code) 1070 if not operation: 1071 operation = '----' 1072 1073 if re.match(r'[\n\t]', operation): 1074 raise ValueError("Invalid character in operation string") 1075 operation = operation.rstrip() 1076 1077 if not optional_fields: 1078 optional_fields = {} 1079 1080 status = status.rstrip() 1081 status = re.sub(r"\t", " ", status) 1082 # Ensure any continuation lines are marked so we can 1083 # detect them in the status file to ensure it is parsable. 1084 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 1085 1086 # Generate timestamps for inclusion in the logs 1087 epoch_time = int(time.time()) # seconds since epoch, in UTC 1088 local_time = time.localtime(epoch_time) 1089 optional_fields["timestamp"] = str(epoch_time) 1090 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1091 local_time) 1092 1093 fields = [status_code, substr, operation] 1094 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1095 fields.append(status) 1096 1097 msg = '\t'.join(str(x) for x in fields) 1098 msg = '\t' * self.group_level + msg 1099 1100 msg_tag = "" 1101 if "." in self.log_filename: 1102 msg_tag = self.log_filename.split(".", 1)[1] 1103 1104 self.harness.test_status_detail(status_code, substr, operation, status, 1105 msg_tag) 1106 self.harness.test_status(msg, msg_tag) 1107 1108 # log to stdout (if enabled) 1109 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 1110 print msg 1111 1112 # log to the "root" status log 1113 status_file = os.path.join(self.resultdir, self.log_filename) 1114 open(status_file, "a").write(msg + "\n") 1115 1116 # log to the subdir status log (if subdir is set) 1117 if subdir: 1118 dir = os.path.join(self.resultdir, subdir) 1119 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1120 open(status_file, "a").write(msg + "\n") 1121 1122 1123class disk_usage_monitor: 1124 def __init__(self, logging_func, device, max_mb_per_hour): 1125 self.func = logging_func 1126 self.device = device 1127 self.max_mb_per_hour = max_mb_per_hour 1128 1129 1130 def start(self): 1131 self.initial_space = autotest_utils.freespace(self.device) 1132 self.start_time = time.time() 1133 1134 1135 def stop(self): 1136 # if no maximum usage rate was set, we don't need to 1137 # generate any warnings 1138 if not self.max_mb_per_hour: 1139 return 1140 1141 final_space = autotest_utils.freespace(self.device) 1142 used_space = self.initial_space - final_space 1143 stop_time = time.time() 1144 total_time = stop_time - self.start_time 1145 # round up the time to one minute, to keep extremely short 1146 # tests from generating false positives due to short, badly 1147 # timed bursts of activity 1148 total_time = max(total_time, 60.0) 1149 1150 # determine the usage rate 1151 bytes_per_sec = used_space / total_time 1152 mb_per_sec = bytes_per_sec / 1024**2 1153 mb_per_hour = mb_per_sec * 60 * 60 1154 1155 if mb_per_hour > self.max_mb_per_hour: 1156 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1157 msg %= (self.device, mb_per_hour) 1158 self.func(msg) 1159 1160 1161 @classmethod 1162 def watch(cls, *monitor_args, **monitor_dargs): 1163 """ Generic decorator to wrap a function call with the 1164 standard create-monitor -> start -> call -> stop idiom.""" 1165 def decorator(func): 1166 def watched_func(*args, **dargs): 1167 monitor = cls(*monitor_args, **monitor_dargs) 1168 monitor.start() 1169 try: 1170 func(*args, **dargs) 1171 finally: 1172 monitor.stop() 1173 return watched_func 1174 return decorator 1175 1176 1177def runjob(control, cont = False, tag = "default", harness_type = '', 1178 use_external_logging = False): 1179 """The main interface to this module 1180 1181 control 1182 The control file to use for this job. 1183 cont 1184 Whether this is the continuation of a previously started job 1185 """ 1186 control = os.path.abspath(control) 1187 state = control + '.state' 1188 1189 # instantiate the job object ready for the control file. 1190 myjob = None 1191 try: 1192 # Check that the control file is valid 1193 if not os.path.exists(control): 1194 raise error.JobError(control + ": control file not found") 1195 1196 # When continuing, the job is complete when there is no 1197 # state file, ensure we don't try and continue. 1198 if cont and not os.path.exists(state): 1199 raise error.JobComplete("all done") 1200 1201 myjob = job(control, tag, cont, harness_type, use_external_logging) 1202 1203 # Load in the users control file, may do any one of: 1204 # 1) execute in toto 1205 # 2) define steps, and select the first via next_step() 1206 myjob.step_engine() 1207 1208 except error.JobContinue: 1209 sys.exit(5) 1210 1211 except error.JobComplete: 1212 sys.exit(1) 1213 1214 except error.JobError, instance: 1215 print "JOB ERROR: " + instance.args[0] 1216 if myjob: 1217 command = None 1218 if len(instance.args) > 1: 1219 command = instance.args[1] 1220 myjob.record('ABORT', None, command, instance.args[0]) 1221 myjob._decrement_group_level() 1222 myjob.record('END ABORT', None, None, instance.args[0]) 1223 assert(myjob.group_level == 0) 1224 myjob.complete(1) 1225 else: 1226 sys.exit(1) 1227 1228 except Exception, e: 1229 msg = str(e) + '\n' + traceback.format_exc() 1230 print "JOB ERROR: " + msg 1231 if myjob: 1232 myjob._decrement_group_level() 1233 myjob.record('END ABORT', None, None, msg) 1234 assert(myjob.group_level == 0) 1235 myjob.complete(1) 1236 else: 1237 sys.exit(1) 1238 1239 # If we get here, then we assume the job is complete and good. 1240 myjob._decrement_group_level() 1241 myjob.record('END GOOD', None, None) 1242 assert(myjob.group_level == 0) 1243 1244 myjob.complete(0) 1245 1246 1247# site_job.py may be non-existant or empty, make sure that an appropriate 1248# site_job class is created nevertheless 1249try: 1250 from autotest_lib.client.bin.site_job import site_job 1251except ImportError: 1252 class site_job(object): 1253 pass 1254 1255class job(site_job, base_job): 1256 pass 1257