job.py revision b931b687e09cec9b25f37d437f33be73b6de20f3
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback, types, copy 10 11# autotest stuff 12from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, fd_stack, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, cpuset, test, filesystem 15from autotest_lib.client.common_lib import error, barrier, log, utils 16from autotest_lib.client.common_lib import packages 17 18JOB_PREAMBLE = """ 19from autotest_lib.client.common_lib.error import * 20from autotest_lib.client.common_lib.utils import * 21from autotest_lib.client.bin.autotest_utils import * 22""" 23 24class StepError(error.AutotestError): 25 pass 26 27class NotAvailableError(error.AutotestError): 28 pass 29 30 31 32def _run_test_complete_on_exit(f): 33 """Decorator for job methods that automatically calls 34 self.harness.run_test_complete when the method exits, if appropriate.""" 35 def wrapped(self, *args, **dargs): 36 try: 37 return f(self, *args, **dargs) 38 finally: 39 if self.log_filename == self.DEFAULT_LOG_FILENAME: 40 self.harness.run_test_complete() 41 wrapped.__name__ = f.__name__ 42 wrapped.__doc__ = f.__doc__ 43 wrapped.__dict__.update(f.__dict__) 44 return wrapped 45 46 47class base_job(object): 48 """The actual job against which we do everything. 49 50 Properties: 51 autodir 52 The top level autotest directory (/usr/local/autotest). 53 Comes from os.environ['AUTODIR']. 54 bindir 55 <autodir>/bin/ 56 libdir 57 <autodir>/lib/ 58 testdir 59 <autodir>/tests/ 60 site_testdir 61 <autodir>/site_tests/ 62 profdir 63 <autodir>/profilers/ 64 tmpdir 65 <autodir>/tmp/ 66 pkgdir 67 <autodir>/packages/ 68 resultdir 69 <autodir>/results/<jobtag> 70 toolsdir 71 <autodir>/tools/ 72 stdout 73 fd_stack object for stdout 74 stderr 75 fd_stack object for stderr 76 profilers 77 the profilers object for this job 78 harness 79 the server harness object for this job 80 config 81 the job configuration for this job 82 """ 83 84 DEFAULT_LOG_FILENAME = "status" 85 86 def __init__(self, control, jobtag, cont, harness_type=None, 87 use_external_logging = False): 88 """ 89 control 90 The control file (pathname of) 91 jobtag 92 The job tag string (eg "default") 93 cont 94 If this is the continuation of this job 95 harness_type 96 An alternative server harness 97 """ 98 self.drop_caches = True 99 self.autodir = os.environ['AUTODIR'] 100 self.bindir = os.path.join(self.autodir, 'bin') 101 self.libdir = os.path.join(self.autodir, 'lib') 102 self.testdir = os.path.join(self.autodir, 'tests') 103 self.site_testdir = os.path.join(self.autodir, 'site_tests') 104 self.profdir = os.path.join(self.autodir, 'profilers') 105 self.tmpdir = os.path.join(self.autodir, 'tmp') 106 self.toolsdir = os.path.join(self.autodir, 'tools') 107 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 108 self.sysinfo = sysinfo.sysinfo(self.resultdir) 109 self.control = os.path.abspath(control) 110 self.state_file = self.control + '.state' 111 self.current_step_ancestry = [] 112 self.next_step_index = 0 113 self.testtag = '' 114 self._load_state() 115 self.pkgmgr = packages.PackageManager( 116 self.autodir, run_function_dargs={'timeout':600}) 117 self.pkgdir = os.path.join(self.autodir, 'packages') 118 self.run_test_cleanup = self.get_state("__run_test_cleanup", 119 default=True) 120 121 if not cont: 122 """ 123 Don't cleanup the tmp dir (which contains the lockfile) 124 in the constructor, this would be a problem for multiple 125 jobs starting at the same time on the same client. Instead 126 do the delete at the server side. We simply create the tmp 127 directory here if it does not already exist. 128 """ 129 if not os.path.exists(self.tmpdir): 130 os.mkdir(self.tmpdir) 131 132 if not os.path.exists(self.pkgdir): 133 os.mkdir(self.pkgdir) 134 135 results = os.path.join(self.autodir, 'results') 136 if not os.path.exists(results): 137 os.mkdir(results) 138 139 download = os.path.join(self.testdir, 'download') 140 if not os.path.exists(download): 141 os.mkdir(download) 142 143 if os.path.exists(self.resultdir): 144 utils.system('rm -rf ' + self.resultdir) 145 os.mkdir(self.resultdir) 146 147 os.mkdir(os.path.join(self.resultdir, 'debug')) 148 os.mkdir(os.path.join(self.resultdir, 'analysis')) 149 150 shutil.copyfile(self.control, 151 os.path.join(self.resultdir, 'control')) 152 153 154 self.control = control 155 self.jobtag = jobtag 156 self.log_filename = self.DEFAULT_LOG_FILENAME 157 self.container = None 158 159 self.stdout = fd_stack.fd_stack(1, sys.stdout) 160 self.stderr = fd_stack.fd_stack(2, sys.stderr) 161 162 self._init_group_level() 163 164 self.config = config.config(self) 165 self.harness = harness.select(harness_type, self) 166 self.profilers = profilers.profilers(self) 167 168 try: 169 tool = self.config_get('boottool.executable') 170 self.bootloader = boottool.boottool(tool) 171 except: 172 pass 173 174 self.sysinfo.log_per_reboot_data() 175 176 if not cont: 177 self.record('START', None, None) 178 self._increment_group_level() 179 180 self.harness.run_start() 181 182 if use_external_logging: 183 self.enable_external_logging() 184 185 # load the max disk usage rate - default to no monitoring 186 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 187 188 189 def monitor_disk_usage(self, max_rate): 190 """\ 191 Signal that the job should monitor disk space usage on / 192 and generate a warning if a test uses up disk space at a 193 rate exceeding 'max_rate'. 194 195 Parameters: 196 max_rate - the maximium allowed rate of disk consumption 197 during a test, in MB/hour, or 0 to indicate 198 no limit. 199 """ 200 self.set_state('__monitor_disk', max_rate) 201 self.max_disk_usage_rate = max_rate 202 203 204 def relative_path(self, path): 205 """\ 206 Return a patch relative to the job results directory 207 """ 208 head = len(self.resultdir) + 1 # remove the / inbetween 209 return path[head:] 210 211 212 def control_get(self): 213 return self.control 214 215 216 def control_set(self, control): 217 self.control = os.path.abspath(control) 218 219 220 def harness_select(self, which): 221 self.harness = harness.select(which, self) 222 223 224 def config_set(self, name, value): 225 self.config.set(name, value) 226 227 228 def config_get(self, name): 229 return self.config.get(name) 230 231 232 def setup_dirs(self, results_dir, tmp_dir): 233 if not tmp_dir: 234 tmp_dir = os.path.join(self.tmpdir, 'build') 235 if not os.path.exists(tmp_dir): 236 os.mkdir(tmp_dir) 237 if not os.path.isdir(tmp_dir): 238 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 239 raise ValueError(e_msg) 240 241 # We label the first build "build" and then subsequent ones 242 # as "build.2", "build.3", etc. Whilst this is a little bit 243 # inconsistent, 99.9% of jobs will only have one build 244 # (that's not done as kernbench, sparse, or buildtest), 245 # so it works out much cleaner. One of life's comprimises. 246 if not results_dir: 247 results_dir = os.path.join(self.resultdir, 'build') 248 i = 2 249 while os.path.exists(results_dir): 250 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 251 i += 1 252 if not os.path.exists(results_dir): 253 os.mkdir(results_dir) 254 255 return (results_dir, tmp_dir) 256 257 258 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 259 kjob = None ): 260 """Summon a xen object""" 261 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 262 build_dir = 'xen' 263 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 264 leave, kjob) 265 266 267 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 268 """Summon a kernel object""" 269 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 270 build_dir = 'linux' 271 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 272 build_dir, leave) 273 274 275 def barrier(self, *args, **kwds): 276 """Create a barrier object""" 277 return barrier.barrier(*args, **kwds) 278 279 280 def install_pkg(self, name, pkg_type, install_dir): 281 ''' 282 This method is a simple wrapper around the actual package 283 installation method in the Packager class. This is used 284 internally by the profilers, deps and tests code. 285 name : name of the package (ex: sleeptest, dbench etc.) 286 pkg_type : Type of the package (ex: test, dep etc.) 287 install_dir : The directory in which the source is actually 288 untarred into. (ex: client/profilers/<name> for profilers) 289 ''' 290 if len(self.pkgmgr.repo_urls) > 0: 291 self.pkgmgr.install_pkg(name, pkg_type, 292 self.pkgdir, install_dir) 293 294 295 def add_repository(self, repo_urls): 296 ''' 297 Adds the repository locations to the job so that packages 298 can be fetched from them when needed. The repository list 299 needs to be a string list 300 Ex: job.add_repository(['http://blah1','http://blah2']) 301 ''' 302 # TODO(aganti): Validate the list of the repository URLs. 303 repositories = repo_urls + self.pkgmgr.repo_urls 304 self.pkgmgr = packages.PackageManager( 305 self.autodir, repo_urls=repositories, 306 run_function_dargs={'timeout':600}) 307 # Fetch the packages' checksum file that contains the checksums 308 # of all the packages if it is not already fetched. The checksum 309 # is always fetched whenever a job is first started. This 310 # is not done in the job's constructor as we don't have the list of 311 # the repositories there (and obviously don't care about this file 312 # if we are not using the repos) 313 try: 314 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 315 packages.CHECKSUM_FILE) 316 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, 317 use_checksum=False) 318 except packages.PackageFetchError, e: 319 # packaging system might not be working in this case 320 # Silently fall back to the normal case 321 pass 322 323 324 def require_gcc(self): 325 """ 326 Test whether gcc is installed on the machine. 327 """ 328 # check if gcc is installed on the system. 329 try: 330 utils.system('which gcc') 331 except error.CmdError, e: 332 raise NotAvailableError('gcc is required by this job and is ' 333 'not available on the system') 334 335 336 def setup_dep(self, deps): 337 """Set up the dependencies for this test. 338 deps is a list of libraries required for this test. 339 """ 340 # Fetch the deps from the repositories and set them up. 341 for dep in deps: 342 dep_dir = os.path.join(self.autodir, 'deps', dep) 343 # Search for the dependency in the repositories if specified, 344 # else check locally. 345 try: 346 self.install_pkg(dep, 'dep', dep_dir) 347 except packages.PackageInstallError: 348 # see if the dep is there locally 349 pass 350 351 # dep_dir might not exist if it is not fetched from the repos 352 if not os.path.exists(dep_dir): 353 raise error.TestError("Dependency %s does not exist" % dep) 354 355 os.chdir(dep_dir) 356 utils.system('./' + dep + '.py') 357 358 359 def _runtest(self, url, tag, args, dargs): 360 try: 361 l = lambda : test.runtest(self, url, tag, args, dargs) 362 pid = parallel.fork_start(self.resultdir, l) 363 parallel.fork_waitfor(self.resultdir, pid) 364 except error.TestBaseException: 365 raise 366 except Exception, e: 367 raise error.UnhandledTestError(e) 368 369 370 @_run_test_complete_on_exit 371 def run_test(self, url, *args, **dargs): 372 """Summon a test object and run it. 373 374 tag 375 tag to add to testname 376 url 377 url of the test to run 378 """ 379 380 if not url: 381 raise TypeError("Test name is invalid. " 382 "Switched arguments?") 383 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 384 namelen = len(testname) 385 dargs = dargs.copy() 386 tntag = dargs.pop('tag', None) 387 if tntag: # per-test tag is included in reported test name 388 testname += '.' + str(tntag) 389 if self.testtag: # job-level tag is included in reported test name 390 testname += '.' + self.testtag 391 subdir = testname 392 sdtag = dargs.pop('subdir_tag', None) 393 if sdtag: # subdir-only tag is not included in reports 394 subdir = subdir + '.' + str(sdtag) 395 tag = subdir[namelen+1:] # '' if none 396 397 outputdir = os.path.join(self.resultdir, subdir) 398 if os.path.exists(outputdir): 399 msg = ("%s already exists, test <%s> may have" 400 " already run with tag <%s>" 401 % (outputdir, testname, tag) ) 402 raise error.TestError(msg) 403 os.mkdir(outputdir) 404 405 container = dargs.pop('container', None) 406 if container: 407 cname = container.get('name', None) 408 if not cname: # get old name 409 cname = container.get('container_name', None) 410 mbytes = container.get('mbytes', None) 411 if not mbytes: # get old name 412 mbytes = container.get('mem', None) 413 cpus = container.get('cpus', None) 414 if not cpus: # get old name 415 cpus = container.get('cpu', None) 416 root = container.get('root', None) 417 network = container.get('network', None) 418 disk = container.get('disk', None) 419 try: 420 self.new_container(mbytes=mbytes, cpus=cpus, root=root, 421 name=cname, network=network, disk=disk) 422 except cpuset.CpusetsNotAvailable, e: 423 self.record("START", subdir, testname) 424 self._increment_group_level() 425 self.record("ERROR", subdir, testname, str(e)) 426 self._decrement_group_level() 427 self.record("END ERROR", subdir, testname) 428 return False 429 # We are running in a container now... 430 431 def log_warning(reason): 432 self.record("WARN", subdir, testname, reason) 433 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 434 def group_func(): 435 try: 436 self._runtest(url, tag, args, dargs) 437 except error.TestBaseException, detail: 438 self.record(detail.exit_status, subdir, testname, 439 str(detail)) 440 raise 441 except Exception, detail: 442 self.record('FAIL', subdir, testname, 443 str(detail)) 444 raise 445 else: 446 self.record('GOOD', subdir, testname, 447 'completed successfully') 448 449 result, exc_info = self._rungroup(subdir, testname, group_func) 450 if container: 451 self.release_container() 452 if exc_info and isinstance(exc_info[1], error.TestBaseException): 453 return False 454 elif exc_info: 455 raise exc_info[0], exc_info[1], exc_info[2] 456 else: 457 return True 458 459 460 def _rungroup(self, subdir, testname, function, *args, **dargs): 461 """\ 462 subdir: 463 name of the group 464 testname: 465 name of the test to run, or support step 466 function: 467 subroutine to run 468 *args: 469 arguments for the function 470 471 Returns a 2-tuple (result, exc_info) where result 472 is the return value of function, and exc_info is 473 the sys.exc_info() of the exception thrown by the 474 function (which may be None). 475 """ 476 477 result, exc_info = None, None 478 try: 479 self.record('START', subdir, testname) 480 self._increment_group_level() 481 result = function(*args, **dargs) 482 self._decrement_group_level() 483 self.record('END GOOD', subdir, testname) 484 except error.TestBaseException, e: 485 exc_info = sys.exc_info() 486 self._decrement_group_level() 487 self.record('END %s' % e.exit_status, subdir, testname, str(e)) 488 except Exception, e: 489 exc_info = sys.exc_info() 490 self._decrement_group_level() 491 err_msg = str(e) + '\n' + traceback.format_exc() 492 self.record('END FAIL', subdir, testname, err_msg) 493 494 return result, exc_info 495 496 497 def run_group(self, function, *args, **dargs): 498 """\ 499 function: 500 subroutine to run 501 *args: 502 arguments for the function 503 """ 504 505 # Allow the tag for the group to be specified 506 name = function.__name__ 507 tag = dargs.pop('tag', None) 508 if tag: 509 name = tag 510 511 outputdir = os.path.join(self.resultdir, name) 512 if os.path.exists(outputdir): 513 msg = ("%s already exists, test <%s> may have" 514 " already run with tag <%s>" 515 % (outputdir, name, name) ) 516 raise error.TestError(msg) 517 os.mkdir(outputdir) 518 519 result, exc_info = self._rungroup(name, name, function, *args, **dargs) 520 521 # if there was a non-TestError exception, raise it 522 if exc_info and not isinstance(exc_info[1], error.TestError): 523 err = ''.join(traceback.format_exception(*exc_info)) 524 raise error.TestError(name + ' failed\n' + err) 525 526 # pass back the actual return value from the function 527 return result 528 529 530 def set_test_tag(self, tag=''): 531 # set tag to be added to test name of all following run_test() steps 532 self.testtag = tag 533 534 535 def new_container(self, mbytes=None, cpus=None, root=None, name=None, 536 network=None, disk=None, kswapd_merge=False): 537 if not autotest_utils.grep('cpuset', '/proc/filesystems'): 538 print "Containers not enabled by latest reboot" 539 return # containers weren't enabled in this kernel boot 540 pid = os.getpid() 541 if not name: 542 name = 'test%d' % pid # make arbitrary unique name 543 self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid, 544 cpus=cpus, root=root, network=network, 545 disk=disk, kswapd_merge=kswapd_merge) 546 # This job's python shell is now running in the new container 547 # and all forked test processes will inherit that container 548 549 550 def release_container(self): 551 if self.container: 552 self.container = self.container.release() 553 554 555 def cpu_count(self): 556 if self.container: 557 return len(self.container.cpus) 558 return autotest_utils.count_cpus() # use total system count 559 560 561 def end_reboot(self, subdir, kernel, patches): 562 kernel_info = {"kernel": kernel} 563 for i, patch in enumerate(patches): 564 kernel_info["patch%d" % i] = patch 565 self._decrement_group_level() 566 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 567 568 569 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 570 type='src', patches=[]): 571 """ Check the passed kernel identifier against the command line 572 and the running kernel, abort the job on missmatch. """ 573 574 print (("POST BOOT: checking booted kernel " + 575 "mark=%d identity='%s' type='%s'") % 576 (expected_when, expected_id, type)) 577 578 running_id = autotest_utils.running_os_ident() 579 580 cmdline = utils.read_one_line("/proc/cmdline") 581 582 find_sum = re.compile(r'.*IDENT=(\d+)') 583 m = find_sum.match(cmdline) 584 cmdline_when = -1 585 if m: 586 cmdline_when = int(m.groups()[0]) 587 588 # We have all the facts, see if they indicate we 589 # booted the requested kernel or not. 590 bad = False 591 if (type == 'src' and expected_id != running_id or 592 type == 'rpm' and 593 not running_id.startswith(expected_id + '::')): 594 print "check_kernel_ident: kernel identifier mismatch" 595 bad = True 596 if expected_when != cmdline_when: 597 print "check_kernel_ident: kernel command line mismatch" 598 bad = True 599 600 if bad: 601 print " Expected Ident: " + expected_id 602 print " Running Ident: " + running_id 603 print " Expected Mark: %d" % (expected_when) 604 print "Command Line Mark: %d" % (cmdline_when) 605 print " Command Line: " + cmdline 606 607 self.record("ABORT", subdir, "reboot.verify", "boot failure") 608 self._decrement_group_level() 609 kernel = {"kernel": running_id.split("::")[0]} 610 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 611 raise error.JobError("reboot returned with the wrong kernel") 612 613 self.record('GOOD', subdir, 'reboot.verify', expected_id) 614 self.end_reboot(subdir, expected_id, patches) 615 616 617 def filesystem(self, device, mountpoint = None, loop_size = 0): 618 if not mountpoint: 619 mountpoint = self.tmpdir 620 return filesystem.filesystem(self, device, mountpoint,loop_size) 621 622 623 def enable_external_logging(self): 624 pass 625 626 627 def disable_external_logging(self): 628 pass 629 630 631 def enable_test_cleanup(self): 632 """ By default tests run test.cleanup """ 633 self.set_state("__run_test_cleanup", True) 634 self.run_test_cleanup = True 635 636 637 def disable_test_cleanup(self): 638 """ By default tests do not run test.cleanup """ 639 self.set_state("__run_test_cleanup", False) 640 self.run_test_cleanup = False 641 642 643 def reboot_setup(self): 644 pass 645 646 647 def reboot(self, tag='autotest'): 648 self.reboot_setup() 649 self.record('START', None, 'reboot') 650 self._increment_group_level() 651 self.record('GOOD', None, 'reboot.start') 652 self.harness.run_reboot() 653 default = self.config_get('boot.set_default') 654 if default: 655 self.bootloader.set_default(tag) 656 else: 657 self.bootloader.boot_once(tag) 658 659 # HACK: using this as a module sometimes hangs shutdown, so if it's 660 # installed unload it first 661 utils.system("modprobe -r netconsole", ignore_status=True) 662 663 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 664 utils.system(cmd) 665 self.quit() 666 667 668 def noop(self, text): 669 print "job: noop: " + text 670 671 672 @_run_test_complete_on_exit 673 def parallel(self, *tasklist): 674 """Run tasks in parallel""" 675 676 pids = [] 677 old_log_filename = self.log_filename 678 for i, task in enumerate(tasklist): 679 self.log_filename = old_log_filename + (".%d" % i) 680 task_func = lambda: task[0](*task[1:]) 681 pids.append(parallel.fork_start(self.resultdir, task_func)) 682 683 old_log_path = os.path.join(self.resultdir, old_log_filename) 684 old_log = open(old_log_path, "a") 685 exceptions = [] 686 for i, pid in enumerate(pids): 687 # wait for the task to finish 688 try: 689 parallel.fork_waitfor(self.resultdir, pid) 690 except Exception, e: 691 exceptions.append(e) 692 # copy the logs from the subtask into the main log 693 new_log_path = old_log_path + (".%d" % i) 694 if os.path.exists(new_log_path): 695 new_log = open(new_log_path) 696 old_log.write(new_log.read()) 697 new_log.close() 698 old_log.flush() 699 os.remove(new_log_path) 700 old_log.close() 701 702 self.log_filename = old_log_filename 703 704 # handle any exceptions raised by the parallel tasks 705 if exceptions: 706 msg = "%d task(s) failed in job.parallel" % len(exceptions) 707 raise error.JobError(msg) 708 709 710 def quit(self): 711 # XXX: should have a better name. 712 self.harness.run_pause() 713 raise error.JobContinue("more to come") 714 715 716 def complete(self, status): 717 """Clean up and exit""" 718 # We are about to exit 'complete' so clean up the control file. 719 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 720 os.rename(self.state_file, dest) 721 722 self.harness.run_complete() 723 self.disable_external_logging() 724 sys.exit(status) 725 726 727 def set_state(self, var, val): 728 # Deep copies make sure that the state can't be altered 729 # without it being re-written. Perf wise, deep copies 730 # are overshadowed by pickling/loading. 731 self.state[var] = copy.deepcopy(val) 732 pickle.dump(self.state, open(self.state_file, 'w')) 733 print "Persistant state variable %s now set to %r" % (var, val) 734 735 736 def _load_state(self): 737 assert not hasattr(self, "state") 738 try: 739 self.state = pickle.load(open(self.state_file, 'r')) 740 self.state_existed = True 741 except Exception: 742 print "Initializing the state engine." 743 self.state = {} 744 self.set_state('__steps', []) # writes pickle file 745 self.state_existed = False 746 747 748 def get_state(self, var, default=None): 749 if var in self.state or default == None: 750 val = self.state[var] 751 else: 752 val = default 753 return copy.deepcopy(val) 754 755 756 def __create_step_tuple(self, fn, args, dargs): 757 # Legacy code passes in an array where the first arg is 758 # the function or its name. 759 if isinstance(fn, list): 760 assert(len(args) == 0) 761 assert(len(dargs) == 0) 762 args = fn[1:] 763 fn = fn[0] 764 # Pickling actual functions is harry, thus we have to call 765 # them by name. Unfortunately, this means only functions 766 # defined globally can be used as a next step. 767 if callable(fn): 768 fn = fn.__name__ 769 if not isinstance(fn, types.StringTypes): 770 raise StepError("Next steps must be functions or " 771 "strings containing the function name") 772 ancestry = copy.copy(self.current_step_ancestry) 773 return (ancestry, fn, args, dargs) 774 775 776 def next_step_append(self, fn, *args, **dargs): 777 """Define the next step and place it at the end""" 778 steps = self.get_state('__steps') 779 steps.append(self.__create_step_tuple(fn, args, dargs)) 780 self.set_state('__steps', steps) 781 782 783 def next_step(self, fn, *args, **dargs): 784 """Create a new step and place it after any steps added 785 while running the current step but before any steps added in 786 previous steps""" 787 steps = self.get_state('__steps') 788 steps.insert(self.next_step_index, 789 self.__create_step_tuple(fn, args, dargs)) 790 self.next_step_index += 1 791 self.set_state('__steps', steps) 792 793 794 def next_step_prepend(self, fn, *args, **dargs): 795 """Insert a new step, executing first""" 796 steps = self.get_state('__steps') 797 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 798 self.next_step_index += 1 799 self.set_state('__steps', steps) 800 801 802 def _run_step_fn(self, local_vars, fn, args, dargs): 803 """Run a (step) function within the given context""" 804 805 local_vars['__args'] = args 806 local_vars['__dargs'] = dargs 807 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 808 return local_vars['__ret'] 809 810 811 def _create_frame(self, global_vars, ancestry, fn_name): 812 """Set up the environment like it would have been when this 813 function was first defined. 814 815 Child step engine 'implementations' must have 'return locals()' 816 at end end of their steps. Because of this, we can call the 817 parent function and get back all child functions (i.e. those 818 defined within it). 819 820 Unfortunately, the call stack of the function calling 821 job.next_step might have been deeper than the function it 822 added. In order to make sure that the environment is what it 823 should be, we need to then pop off the frames we built until 824 we find the frame where the function was first defined.""" 825 826 # The copies ensure that the parent frames are not modified 827 # while building child frames. This matters if we then 828 # pop some frames in the next part of this function. 829 current_frame = copy.copy(global_vars) 830 frames = [current_frame] 831 for steps_fn_name in ancestry: 832 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 833 current_frame = copy.copy(ret) 834 frames.append(current_frame) 835 836 while len(frames) > 2: 837 if fn_name not in frames[-2]: 838 break 839 if frames[-2][fn_name] != frames[-1][fn_name]: 840 break 841 frames.pop() 842 ancestry.pop() 843 844 return (frames[-1], ancestry) 845 846 847 def _add_step_init(self, local_vars, current_function): 848 """If the function returned a dictionary that includes a 849 function named 'step_init', prepend it to our list of steps. 850 This will only get run the first time a function with a nested 851 use of the step engine is run.""" 852 853 if (isinstance(local_vars, dict) and 854 'step_init' in local_vars and 855 callable(local_vars['step_init'])): 856 # The init step is a child of the function 857 # we were just running. 858 self.current_step_ancestry.append(current_function) 859 self.next_step_prepend('step_init') 860 861 862 def step_engine(self): 863 """the stepping engine -- if the control file defines 864 step_init we will be using this engine to drive multiple runs. 865 """ 866 """Do the next step""" 867 868 # Set up the environment and then interpret the control file. 869 # Some control files will have code outside of functions, 870 # which means we need to have our state engine initialized 871 # before reading in the file. 872 global_control_vars = {'job': self} 873 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 874 execfile(self.control, global_control_vars, global_control_vars) 875 876 # If we loaded in a mid-job state file, then we presumably 877 # know what steps we have yet to run. 878 if not self.state_existed: 879 if global_control_vars.has_key('step_init'): 880 self.next_step(global_control_vars['step_init']) 881 882 # Iterate through the steps. If we reboot, we'll simply 883 # continue iterating on the next step. 884 while len(self.get_state('__steps')) > 0: 885 steps = self.get_state('__steps') 886 (ancestry, fn_name, args, dargs) = steps.pop(0) 887 self.set_state('__steps', steps) 888 889 self.next_step_index = 0 890 ret = self._create_frame(global_control_vars, ancestry, fn_name) 891 local_vars, self.current_step_ancestry = ret 892 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 893 self._add_step_init(local_vars, fn_name) 894 895 896 def _init_group_level(self): 897 self.group_level = self.get_state("__group_level", default=0) 898 899 900 def _increment_group_level(self): 901 self.group_level += 1 902 self.set_state("__group_level", self.group_level) 903 904 905 def _decrement_group_level(self): 906 self.group_level -= 1 907 self.set_state("__group_level", self.group_level) 908 909 910 def record(self, status_code, subdir, operation, status = '', 911 optional_fields=None): 912 """ 913 Record job-level status 914 915 The intent is to make this file both machine parseable and 916 human readable. That involves a little more complexity, but 917 really isn't all that bad ;-) 918 919 Format is <status code>\t<subdir>\t<operation>\t<status> 920 921 status code: (GOOD|WARN|FAIL|ABORT) 922 or START 923 or END (GOOD|WARN|FAIL|ABORT) 924 925 subdir: MUST be a relevant subdirectory in the results, 926 or None, which will be represented as '----' 927 928 operation: description of what you ran (e.g. "dbench", or 929 "mkfs -t foobar /dev/sda9") 930 931 status: error message or "completed sucessfully" 932 933 ------------------------------------------------------------ 934 935 Initial tabs indicate indent levels for grouping, and is 936 governed by self.group_level 937 938 multiline messages have secondary lines prefaced by a double 939 space (' ') 940 """ 941 942 if subdir: 943 if re.match(r'[\n\t]', subdir): 944 raise ValueError("Invalid character in subdir string") 945 substr = subdir 946 else: 947 substr = '----' 948 949 if not log.is_valid_status(status_code): 950 raise ValueError("Invalid status code supplied: %s" % status_code) 951 if not operation: 952 operation = '----' 953 954 if re.match(r'[\n\t]', operation): 955 raise ValueError("Invalid character in operation string") 956 operation = operation.rstrip() 957 958 if not optional_fields: 959 optional_fields = {} 960 961 status = status.rstrip() 962 status = re.sub(r"\r", "", status) 963 status = re.sub(r"\t", " ", status) 964 # Ensure any continuation lines are marked so we can 965 # detect them in the status file to ensure it is parsable. 966 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 967 968 # Generate timestamps for inclusion in the logs 969 epoch_time = int(time.time()) # seconds since epoch, in UTC 970 local_time = time.localtime(epoch_time) 971 optional_fields["timestamp"] = str(epoch_time) 972 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 973 local_time) 974 975 fields = [status_code, substr, operation] 976 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 977 fields.append(status) 978 979 msg = '\t'.join(str(x) for x in fields) 980 msg = '\t' * self.group_level + msg 981 982 msg_tag = "" 983 if "." in self.log_filename: 984 msg_tag = self.log_filename.split(".", 1)[1] 985 986 self.harness.test_status_detail(status_code, substr, operation, status, 987 msg_tag) 988 self.harness.test_status(msg, msg_tag) 989 990 # log to stdout (if enabled) 991 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 992 print msg 993 994 # log to the "root" status log 995 status_file = os.path.join(self.resultdir, self.log_filename) 996 open(status_file, "a").write(msg + "\n") 997 998 # log to the subdir status log (if subdir is set) 999 if subdir: 1000 dir = os.path.join(self.resultdir, subdir) 1001 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1002 open(status_file, "a").write(msg + "\n") 1003 1004 1005class disk_usage_monitor: 1006 def __init__(self, logging_func, device, max_mb_per_hour): 1007 self.func = logging_func 1008 self.device = device 1009 self.max_mb_per_hour = max_mb_per_hour 1010 1011 1012 def start(self): 1013 self.initial_space = autotest_utils.freespace(self.device) 1014 self.start_time = time.time() 1015 1016 1017 def stop(self): 1018 # if no maximum usage rate was set, we don't need to 1019 # generate any warnings 1020 if not self.max_mb_per_hour: 1021 return 1022 1023 final_space = autotest_utils.freespace(self.device) 1024 used_space = self.initial_space - final_space 1025 stop_time = time.time() 1026 total_time = stop_time - self.start_time 1027 # round up the time to one minute, to keep extremely short 1028 # tests from generating false positives due to short, badly 1029 # timed bursts of activity 1030 total_time = max(total_time, 60.0) 1031 1032 # determine the usage rate 1033 bytes_per_sec = used_space / total_time 1034 mb_per_sec = bytes_per_sec / 1024**2 1035 mb_per_hour = mb_per_sec * 60 * 60 1036 1037 if mb_per_hour > self.max_mb_per_hour: 1038 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1039 msg %= (self.device, mb_per_hour) 1040 self.func(msg) 1041 1042 1043 @classmethod 1044 def watch(cls, *monitor_args, **monitor_dargs): 1045 """ Generic decorator to wrap a function call with the 1046 standard create-monitor -> start -> call -> stop idiom.""" 1047 def decorator(func): 1048 def watched_func(*args, **dargs): 1049 monitor = cls(*monitor_args, **monitor_dargs) 1050 monitor.start() 1051 try: 1052 func(*args, **dargs) 1053 finally: 1054 monitor.stop() 1055 return watched_func 1056 return decorator 1057 1058 1059def runjob(control, cont = False, tag = "default", harness_type = '', 1060 use_external_logging = False): 1061 """The main interface to this module 1062 1063 control 1064 The control file to use for this job. 1065 cont 1066 Whether this is the continuation of a previously started job 1067 """ 1068 control = os.path.abspath(control) 1069 state = control + '.state' 1070 1071 # instantiate the job object ready for the control file. 1072 myjob = None 1073 try: 1074 # Check that the control file is valid 1075 if not os.path.exists(control): 1076 raise error.JobError(control + ": control file not found") 1077 1078 # When continuing, the job is complete when there is no 1079 # state file, ensure we don't try and continue. 1080 if cont and not os.path.exists(state): 1081 raise error.JobComplete("all done") 1082 if cont == False and os.path.exists(state): 1083 os.unlink(state) 1084 1085 myjob = job(control, tag, cont, harness_type, use_external_logging) 1086 1087 # Load in the users control file, may do any one of: 1088 # 1) execute in toto 1089 # 2) define steps, and select the first via next_step() 1090 myjob.step_engine() 1091 1092 except error.JobContinue: 1093 sys.exit(5) 1094 1095 except error.JobComplete: 1096 sys.exit(1) 1097 1098 except error.JobError, instance: 1099 print "JOB ERROR: " + instance.args[0] 1100 if myjob: 1101 command = None 1102 if len(instance.args) > 1: 1103 command = instance.args[1] 1104 myjob.record('ABORT', None, command, instance.args[0]) 1105 myjob._decrement_group_level() 1106 myjob.record('END ABORT', None, None, instance.args[0]) 1107 assert(myjob.group_level == 0) 1108 myjob.complete(1) 1109 else: 1110 sys.exit(1) 1111 1112 except Exception, e: 1113 msg = str(e) + '\n' + traceback.format_exc() 1114 print "JOB ERROR: " + msg 1115 if myjob: 1116 myjob._decrement_group_level() 1117 myjob.record('END ABORT', None, None, msg) 1118 assert(myjob.group_level == 0) 1119 myjob.complete(1) 1120 else: 1121 sys.exit(1) 1122 1123 # If we get here, then we assume the job is complete and good. 1124 myjob._decrement_group_level() 1125 myjob.record('END GOOD', None, None) 1126 assert(myjob.group_level == 0) 1127 1128 myjob.complete(0) 1129 1130 1131# site_job.py may be non-existant or empty, make sure that an appropriate 1132# site_job class is created nevertheless 1133try: 1134 from autotest_lib.client.bin.site_job import site_job 1135except ImportError: 1136 class site_job(object): 1137 pass 1138 1139class job(site_job, base_job): 1140 pass 1141