job.py revision c39fa9ada24f773079d6162383013827e4bc4292
1"""The main job wrapper 2 3This is the core infrastructure. 4 5Copyright Andy Whitcroft, Martin J. Bligh 2006 6""" 7 8import copy, os, platform, re, shutil, sys, time, traceback, types 9import cPickle as pickle 10from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen 11from autotest_lib.client.bin import profilers, fd_stack, boottool, harness 12from autotest_lib.client.bin import config, sysinfo, cpuset, test, partition 13from autotest_lib.client.common_lib import error, barrier, log, utils 14from autotest_lib.client.common_lib import packages, debug 15 16LAST_BOOT_TAG = object() 17NO_DEFAULT = object() 18JOB_PREAMBLE = """ 19from autotest_lib.client.common_lib.error import * 20from autotest_lib.client.common_lib.utils import * 21from autotest_lib.client.bin.autotest_utils import * 22""" 23 24 25class StepError(error.AutotestError): 26 pass 27 28class NotAvailableError(error.AutotestError): 29 pass 30 31 32 33def _run_test_complete_on_exit(f): 34 """Decorator for job methods that automatically calls 35 self.harness.run_test_complete when the method exits, if appropriate.""" 36 def wrapped(self, *args, **dargs): 37 try: 38 return f(self, *args, **dargs) 39 finally: 40 if self.log_filename == self.DEFAULT_LOG_FILENAME: 41 self.harness.run_test_complete() 42 if self.drop_caches: 43 print "Dropping caches" 44 autotest_utils.drop_caches() 45 wrapped.__name__ = f.__name__ 46 wrapped.__doc__ = f.__doc__ 47 wrapped.__dict__.update(f.__dict__) 48 return wrapped 49 50 51class base_job(object): 52 """The actual job against which we do everything. 53 54 Properties: 55 autodir 56 The top level autotest directory (/usr/local/autotest). 57 Comes from os.environ['AUTODIR']. 58 bindir 59 <autodir>/bin/ 60 libdir 61 <autodir>/lib/ 62 testdir 63 <autodir>/tests/ 64 configdir 65 <autodir>/config/ 66 site_testdir 67 <autodir>/site_tests/ 68 profdir 69 <autodir>/profilers/ 70 tmpdir 71 <autodir>/tmp/ 72 pkgdir 73 <autodir>/packages/ 74 resultdir 75 <autodir>/results/<jobtag> 76 toolsdir 77 <autodir>/tools/ 78 stdout 79 fd_stack object for stdout 80 stderr 81 fd_stack object for stderr 82 profilers 83 the profilers object for this job 84 harness 85 the server harness object for this job 86 config 87 the job configuration for this job 88 drop_caches_between_iterations 89 drop the pagecache between each iteration 90 """ 91 92 DEFAULT_LOG_FILENAME = "status" 93 94 def __init__(self, control, jobtag, cont, harness_type=None, 95 use_external_logging=False, drop_caches=True): 96 """ 97 Prepare a client side job object. 98 99 Args: 100 control: The control file (pathname of). 101 jobtag: The job tag string (eg "default"). 102 cont: If this is the continuation of this job. 103 harness_type: An alternative server harness. [None] 104 use_external_logging: If true, the enable_external_logging 105 method will be called during construction. [False] 106 drop_caches: If true, autotest_utils.drop_caches() is 107 called before and between all tests. [True] 108 """ 109 self.drop_caches = drop_caches 110 if self.drop_caches: 111 print "Dropping caches" 112 autotest_utils.drop_caches() 113 self.drop_caches_between_iterations = False 114 self.autodir = os.environ['AUTODIR'] 115 self.bindir = os.path.join(self.autodir, 'bin') 116 self.libdir = os.path.join(self.autodir, 'lib') 117 self.testdir = os.path.join(self.autodir, 'tests') 118 self.configdir = os.path.join(self.autodir, 'config') 119 self.site_testdir = os.path.join(self.autodir, 'site_tests') 120 self.profdir = os.path.join(self.autodir, 'profilers') 121 self.tmpdir = os.path.join(self.autodir, 'tmp') 122 self.toolsdir = os.path.join(self.autodir, 'tools') 123 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 124 125 self.control = os.path.realpath(control) 126 self._is_continuation = cont 127 self.state_file = self.control + '.state' 128 self.current_step_ancestry = [] 129 self.next_step_index = 0 130 self.testtag = '' 131 self._test_tag_prefix = '' 132 133 self._load_state() 134 self.pkgmgr = packages.PackageManager( 135 self.autodir, run_function_dargs={'timeout':1800}) 136 self.pkgdir = os.path.join(self.autodir, 'packages') 137 self.run_test_cleanup = self.get_state("__run_test_cleanup", 138 default=True) 139 140 self.sysinfo = sysinfo.sysinfo(self.resultdir) 141 self._load_sysinfo_state() 142 143 self.last_boot_tag = self.get_state("__last_boot_tag", default=None) 144 self.job_log = debug.get_logger(module='client') 145 146 if not cont: 147 """ 148 Don't cleanup the tmp dir (which contains the lockfile) 149 in the constructor, this would be a problem for multiple 150 jobs starting at the same time on the same client. Instead 151 do the delete at the server side. We simply create the tmp 152 directory here if it does not already exist. 153 """ 154 if not os.path.exists(self.tmpdir): 155 os.mkdir(self.tmpdir) 156 157 if not os.path.exists(self.pkgdir): 158 os.mkdir(self.pkgdir) 159 160 results = os.path.join(self.autodir, 'results') 161 if not os.path.exists(results): 162 os.mkdir(results) 163 164 download = os.path.join(self.testdir, 'download') 165 if not os.path.exists(download): 166 os.mkdir(download) 167 168 if os.path.exists(self.resultdir): 169 utils.system('rm -rf ' + self.resultdir) 170 os.mkdir(self.resultdir) 171 172 os.mkdir(os.path.join(self.resultdir, 'debug')) 173 os.mkdir(os.path.join(self.resultdir, 'analysis')) 174 175 shutil.copyfile(self.control, 176 os.path.join(self.resultdir, 'control')) 177 178 179 self.control = control 180 self.jobtag = jobtag 181 self.log_filename = self.DEFAULT_LOG_FILENAME 182 self.container = None 183 184 self.stdout = fd_stack.fd_stack(1, sys.stdout) 185 self.stderr = fd_stack.fd_stack(2, sys.stderr) 186 187 self._init_group_level() 188 189 self.config = config.config(self) 190 self.harness = harness.select(harness_type, self) 191 self.profilers = profilers.profilers(self) 192 193 try: 194 tool = self.config_get('boottool.executable') 195 self.bootloader = boottool.boottool(tool) 196 except: 197 pass 198 199 self.sysinfo.log_per_reboot_data() 200 201 if not cont: 202 self.record('START', None, None) 203 self._increment_group_level() 204 205 self.harness.run_start() 206 207 if use_external_logging: 208 self.enable_external_logging() 209 210 # load the max disk usage rate - default to no monitoring 211 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 212 213 214 def monitor_disk_usage(self, max_rate): 215 """\ 216 Signal that the job should monitor disk space usage on / 217 and generate a warning if a test uses up disk space at a 218 rate exceeding 'max_rate'. 219 220 Parameters: 221 max_rate - the maximium allowed rate of disk consumption 222 during a test, in MB/hour, or 0 to indicate 223 no limit. 224 """ 225 self.set_state('__monitor_disk', max_rate) 226 self.max_disk_usage_rate = max_rate 227 228 229 def relative_path(self, path): 230 """\ 231 Return a patch relative to the job results directory 232 """ 233 head = len(self.resultdir) + 1 # remove the / inbetween 234 return path[head:] 235 236 237 def control_get(self): 238 return self.control 239 240 241 def control_set(self, control): 242 self.control = os.path.abspath(control) 243 244 245 def harness_select(self, which): 246 self.harness = harness.select(which, self) 247 248 249 def config_set(self, name, value): 250 self.config.set(name, value) 251 252 253 def config_get(self, name): 254 return self.config.get(name) 255 256 257 def setup_dirs(self, results_dir, tmp_dir): 258 if not tmp_dir: 259 tmp_dir = os.path.join(self.tmpdir, 'build') 260 if not os.path.exists(tmp_dir): 261 os.mkdir(tmp_dir) 262 if not os.path.isdir(tmp_dir): 263 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 264 raise ValueError(e_msg) 265 266 # We label the first build "build" and then subsequent ones 267 # as "build.2", "build.3", etc. Whilst this is a little bit 268 # inconsistent, 99.9% of jobs will only have one build 269 # (that's not done as kernbench, sparse, or buildtest), 270 # so it works out much cleaner. One of life's comprimises. 271 if not results_dir: 272 results_dir = os.path.join(self.resultdir, 'build') 273 i = 2 274 while os.path.exists(results_dir): 275 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 276 i += 1 277 if not os.path.exists(results_dir): 278 os.mkdir(results_dir) 279 280 return (results_dir, tmp_dir) 281 282 283 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 284 kjob = None ): 285 """Summon a xen object""" 286 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 287 build_dir = 'xen' 288 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, 289 leave, kjob) 290 291 292 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 293 """Summon a kernel object""" 294 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 295 build_dir = 'linux' 296 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 297 build_dir, leave) 298 299 300 def barrier(self, *args, **kwds): 301 """Create a barrier object""" 302 return barrier.barrier(*args, **kwds) 303 304 305 def install_pkg(self, name, pkg_type, install_dir): 306 ''' 307 This method is a simple wrapper around the actual package 308 installation method in the Packager class. This is used 309 internally by the profilers, deps and tests code. 310 name : name of the package (ex: sleeptest, dbench etc.) 311 pkg_type : Type of the package (ex: test, dep etc.) 312 install_dir : The directory in which the source is actually 313 untarred into. (ex: client/profilers/<name> for profilers) 314 ''' 315 if len(self.pkgmgr.repo_urls) > 0: 316 self.pkgmgr.install_pkg(name, pkg_type, 317 self.pkgdir, install_dir) 318 319 320 def add_repository(self, repo_urls): 321 ''' 322 Adds the repository locations to the job so that packages 323 can be fetched from them when needed. The repository list 324 needs to be a string list 325 Ex: job.add_repository(['http://blah1','http://blah2']) 326 ''' 327 # TODO(aganti): Validate the list of the repository URLs. 328 repositories = repo_urls + self.pkgmgr.repo_urls 329 self.pkgmgr = packages.PackageManager( 330 self.autodir, repo_urls=repositories, 331 run_function_dargs={'timeout':1800}) 332 # Fetch the packages' checksum file that contains the checksums 333 # of all the packages if it is not already fetched. The checksum 334 # is always fetched whenever a job is first started. This 335 # is not done in the job's constructor as we don't have the list of 336 # the repositories there (and obviously don't care about this file 337 # if we are not using the repos) 338 try: 339 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir, 340 packages.CHECKSUM_FILE) 341 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE, checksum_file_path, 342 use_checksum=False) 343 except packages.PackageFetchError, e: 344 # packaging system might not be working in this case 345 # Silently fall back to the normal case 346 pass 347 348 349 def require_gcc(self): 350 """ 351 Test whether gcc is installed on the machine. 352 """ 353 # check if gcc is installed on the system. 354 try: 355 utils.system('which gcc') 356 except error.CmdError, e: 357 raise NotAvailableError('gcc is required by this job and is ' 358 'not available on the system') 359 360 361 def setup_dep(self, deps): 362 """Set up the dependencies for this test. 363 deps is a list of libraries required for this test. 364 """ 365 # Fetch the deps from the repositories and set them up. 366 for dep in deps: 367 dep_dir = os.path.join(self.autodir, 'deps', dep) 368 # Search for the dependency in the repositories if specified, 369 # else check locally. 370 try: 371 self.install_pkg(dep, 'dep', dep_dir) 372 except packages.PackageInstallError: 373 # see if the dep is there locally 374 pass 375 376 # dep_dir might not exist if it is not fetched from the repos 377 if not os.path.exists(dep_dir): 378 raise error.TestError("Dependency %s does not exist" % dep) 379 380 os.chdir(dep_dir) 381 utils.system('./' + dep + '.py') 382 383 384 def _runtest(self, url, tag, args, dargs): 385 try: 386 try: 387 l = lambda : test.runtest(self, url, tag, args, dargs) 388 pid = parallel.fork_start(self.resultdir, l) 389 parallel.fork_waitfor(self.resultdir, pid) 390 except error.TestBaseException: 391 # These are already classified with an error type (exit_status) 392 raise 393 except error.JobError: 394 raise # Caught further up and turned into an ABORT. 395 except Exception, e: 396 # Converts all other exceptions thrown by the test regardless 397 # of phase into a TestError(TestBaseException) subclass that 398 # reports them with their full stack trace. 399 raise error.UnhandledTestError(e) 400 finally: 401 # Reset the logging level to client level 402 debug.configure(module='client') 403 404 405 @_run_test_complete_on_exit 406 def run_test(self, url, *args, **dargs): 407 """Summon a test object and run it. 408 409 tag 410 tag to add to testname 411 url 412 url of the test to run 413 """ 414 415 if not url: 416 raise TypeError("Test name is invalid. " 417 "Switched arguments?") 418 (group, testname) = self.pkgmgr.get_package_name(url, 'test') 419 namelen = len(testname) 420 dargs = dargs.copy() 421 tntag = dargs.pop('tag', None) 422 if tntag: # per-test tag is included in reported test name 423 testname += '.' + str(tntag) 424 run_number = self.get_run_number() 425 if run_number: 426 testname += '._%02d_' % run_number 427 self.set_run_number(run_number + 1) 428 if self._is_kernel_in_test_tag(): 429 testname += '.' + platform.release() 430 if self._test_tag_prefix: 431 testname += '.' + self._test_tag_prefix 432 if self.testtag: # job-level tag is included in reported test name 433 testname += '.' + self.testtag 434 subdir = testname 435 sdtag = dargs.pop('subdir_tag', None) 436 if sdtag: # subdir-only tag is not included in reports 437 subdir = subdir + '.' + str(sdtag) 438 tag = subdir[namelen+1:] # '' if none 439 440 outputdir = os.path.join(self.resultdir, subdir) 441 if os.path.exists(outputdir): 442 msg = ("%s already exists, test <%s> may have" 443 " already run with tag <%s>" 444 % (outputdir, testname, tag) ) 445 raise error.TestError(msg) 446 # NOTE: client/common_lib/test.py runtest() depends directory names 447 # being constructed the same way as in this code. 448 os.mkdir(outputdir) 449 450 container = dargs.pop('container', None) 451 if container: 452 cname = container.get('name', None) 453 if not cname: # get old name 454 cname = container.get('container_name', None) 455 mbytes = container.get('mbytes', None) 456 if not mbytes: # get old name 457 mbytes = container.get('mem', None) 458 cpus = container.get('cpus', None) 459 if not cpus: # get old name 460 cpus = container.get('cpu', None) 461 root = container.get('root', None) 462 network = container.get('network', None) 463 disk = container.get('disk', None) 464 try: 465 self.new_container(mbytes=mbytes, cpus=cpus, root=root, 466 name=cname, network=network, disk=disk) 467 except cpuset.CpusetsNotAvailable, e: 468 self.record("START", subdir, testname) 469 self._increment_group_level() 470 self.record("ERROR", subdir, testname, str(e)) 471 self._decrement_group_level() 472 self.record("END ERROR", subdir, testname) 473 return False 474 # We are running in a container now... 475 476 def log_warning(reason): 477 self.record("WARN", subdir, testname, reason) 478 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 479 def group_func(): 480 try: 481 self._runtest(url, tag, args, dargs) 482 except error.TestBaseException, detail: 483 # The error is already classified, record it properly. 484 self.record(detail.exit_status, subdir, testname, str(detail)) 485 raise 486 else: 487 self.record('GOOD', subdir, testname, 'completed successfully') 488 489 try: 490 self._rungroup(subdir, testname, group_func) 491 if container: 492 self.release_container() 493 return True 494 except error.TestBaseException: 495 return False 496 # Any other exception here will be given to the caller 497 # 498 # NOTE: The only exception possible from the control file here 499 # is error.JobError as _runtest() turns all others into an 500 # UnhandledTestError that is caught above. 501 502 503 def _rungroup(self, subdir, testname, function, *args, **dargs): 504 """\ 505 subdir: 506 name of the group 507 testname: 508 name of the test to run, or support step 509 function: 510 subroutine to run 511 *args: 512 arguments for the function 513 514 Returns the result of the passed in function 515 """ 516 517 try: 518 self.record('START', subdir, testname) 519 self._increment_group_level() 520 result = function(*args, **dargs) 521 self._decrement_group_level() 522 self.record('END GOOD', subdir, testname) 523 return result 524 except error.TestBaseException, e: 525 self._decrement_group_level() 526 self.record('END %s' % e.exit_status, subdir, testname, str(e)) 527 raise 528 except error.JobError, e: 529 self._decrement_group_level() 530 self.record('END ABORT', subdir, testname, str(e)) 531 raise 532 except Exception, e: 533 # This should only ever happen due to a bug in the given 534 # function's code. The common case of being called by 535 # run_test() will never reach this. If a control file called 536 # run_group() itself, bugs in its function will be caught 537 # here. 538 self._decrement_group_level() 539 err_msg = str(e) + '\n' + traceback.format_exc() 540 self.record('END ERROR', subdir, testname, err_msg) 541 raise 542 543 544 def run_group(self, function, tag=None, **dargs): 545 """ 546 Run a function nested within a group level. 547 548 function: 549 Callable to run. 550 tag: 551 An optional tag name for the group. If None (default) 552 function.__name__ will be used. 553 **dargs: 554 Named arguments for the function. 555 """ 556 if tag: 557 name = tag 558 else: 559 name = function.__name__ 560 561 try: 562 return self._rungroup(subdir=None, testname=name, 563 function=function, **dargs) 564 except (SystemExit, error.TestBaseException): 565 raise 566 # If there was a different exception, turn it into a TestError. 567 # It will be caught by step_engine or _run_step_fn. 568 except Exception, e: 569 raise error.UnhandledTestError(e) 570 571 572 _RUN_NUMBER_STATE = '__run_number' 573 def get_run_number(self): 574 """Get the run number or 0 if no run number has been set.""" 575 return self.get_state(self._RUN_NUMBER_STATE, default=0) 576 577 578 def set_run_number(self, number): 579 """If the run number is non-zero it will be in the output dir name.""" 580 self.set_state(self._RUN_NUMBER_STATE, number) 581 582 583 _KERNEL_IN_TAG_STATE = '__kernel_version_in_test_tag' 584 def _is_kernel_in_test_tag(self): 585 """Boolean: should the kernel version be included in the test tag.""" 586 return self.get_state(self._KERNEL_IN_TAG_STATE, default=False) 587 588 589 def show_kernel_in_test_tag(self, value=True): 590 """If True, the kernel version at test time will prefix test tags.""" 591 self.set_state(self._KERNEL_IN_TAG_STATE, value) 592 593 594 def set_test_tag(self, tag=''): 595 """Set tag to be added to test name of all following run_test steps.""" 596 self.testtag = tag 597 598 599 def set_test_tag_prefix(self, prefix): 600 """Set a prefix string to prepend to all future run_test steps. 601 602 Args: 603 prefix: A string to prepend to any run_test() step tags separated by 604 a '.'; use the empty string '' to clear it. 605 """ 606 self._test_tag_prefix = prefix 607 608 609 def new_container(self, mbytes=None, cpus=None, root=None, name=None, 610 network=None, disk=None, kswapd_merge=False): 611 if not autotest_utils.grep('cpuset', '/proc/filesystems'): 612 print "Containers not enabled by latest reboot" 613 return # containers weren't enabled in this kernel boot 614 pid = os.getpid() 615 if not name: 616 name = 'test%d' % pid # make arbitrary unique name 617 self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid, 618 cpus=cpus, root=root, network=network, 619 disk=disk, kswapd_merge=kswapd_merge) 620 # This job's python shell is now running in the new container 621 # and all forked test processes will inherit that container 622 623 624 def release_container(self): 625 if self.container: 626 self.container = self.container.release() 627 628 629 def cpu_count(self): 630 if self.container: 631 return len(self.container.cpus) 632 return autotest_utils.count_cpus() # use total system count 633 634 635 def start_reboot(self): 636 self.record('START', None, 'reboot') 637 self._increment_group_level() 638 self.record('GOOD', None, 'reboot.start') 639 640 641 def end_reboot(self, subdir, kernel, patches): 642 kernel_info = {"kernel": kernel} 643 for i, patch in enumerate(patches): 644 kernel_info["patch%d" % i] = patch 645 self._decrement_group_level() 646 self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info) 647 648 649 def end_reboot_and_verify(self, expected_when, expected_id, subdir, 650 type='src', patches=[]): 651 """ Check the passed kernel identifier against the command line 652 and the running kernel, abort the job on missmatch. """ 653 654 print (("POST BOOT: checking booted kernel " + 655 "mark=%d identity='%s' type='%s'") % 656 (expected_when, expected_id, type)) 657 658 running_id = autotest_utils.running_os_ident() 659 660 cmdline = utils.read_one_line("/proc/cmdline") 661 662 find_sum = re.compile(r'.*IDENT=(\d+)') 663 m = find_sum.match(cmdline) 664 cmdline_when = -1 665 if m: 666 cmdline_when = int(m.groups()[0]) 667 668 # We have all the facts, see if they indicate we 669 # booted the requested kernel or not. 670 bad = False 671 if (type == 'src' and expected_id != running_id or 672 type == 'rpm' and 673 not running_id.startswith(expected_id + '::')): 674 print "check_kernel_ident: kernel identifier mismatch" 675 bad = True 676 if expected_when != cmdline_when: 677 print "check_kernel_ident: kernel command line mismatch" 678 bad = True 679 680 if bad: 681 print " Expected Ident: " + expected_id 682 print " Running Ident: " + running_id 683 print " Expected Mark: %d" % (expected_when) 684 print "Command Line Mark: %d" % (cmdline_when) 685 print " Command Line: " + cmdline 686 687 self.record("ABORT", subdir, "reboot.verify", "boot failure") 688 self._decrement_group_level() 689 kernel = {"kernel": running_id.split("::")[0]} 690 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel) 691 raise error.JobError("reboot returned with the wrong kernel") 692 693 self.record('GOOD', subdir, 'reboot.verify', expected_id) 694 self.end_reboot(subdir, expected_id, patches) 695 696 697 def filesystem(self, device, mountpoint = None, loop_size = 0): 698 if not mountpoint: 699 mountpoint = self.tmpdir 700 return partition.partition(self, device, mountpoint, loop_size) 701 702 703 def enable_external_logging(self): 704 pass 705 706 707 def disable_external_logging(self): 708 pass 709 710 711 def enable_test_cleanup(self): 712 """ By default tests run test.cleanup """ 713 self.set_state("__run_test_cleanup", True) 714 self.run_test_cleanup = True 715 716 717 def disable_test_cleanup(self): 718 """ By default tests do not run test.cleanup """ 719 self.set_state("__run_test_cleanup", False) 720 self.run_test_cleanup = False 721 722 723 def default_test_cleanup(self, val): 724 if not self._is_continuation: 725 self.set_state("__run_test_cleanup", val) 726 self.run_test_cleanup = val 727 728 729 def default_boot_tag(self, tag): 730 if not self._is_continuation: 731 self.set_state("__last_boot_tag", tag) 732 self.last_boot_tag = tag 733 734 735 def reboot_setup(self): 736 pass 737 738 739 def reboot(self, tag=LAST_BOOT_TAG): 740 if tag == LAST_BOOT_TAG: 741 tag = self.last_boot_tag 742 else: 743 self.set_state("__last_boot_tag", tag) 744 self.last_boot_tag = tag 745 746 self.reboot_setup() 747 self.harness.run_reboot() 748 default = self.config_get('boot.set_default') 749 if default: 750 self.bootloader.set_default(tag) 751 else: 752 self.bootloader.boot_once(tag) 753 754 # HACK: using this as a module sometimes hangs shutdown, so if it's 755 # installed unload it first 756 utils.system("modprobe -r netconsole", ignore_status=True) 757 758 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 759 utils.system(cmd) 760 self.quit() 761 762 763 def noop(self, text): 764 print "job: noop: " + text 765 766 767 @_run_test_complete_on_exit 768 def parallel(self, *tasklist): 769 """Run tasks in parallel""" 770 771 pids = [] 772 old_log_filename = self.log_filename 773 for i, task in enumerate(tasklist): 774 assert isinstance(task, (tuple, list)) 775 self.log_filename = old_log_filename + (".%d" % i) 776 task_func = lambda: task[0](*task[1:]) 777 pids.append(parallel.fork_start(self.resultdir, task_func)) 778 779 old_log_path = os.path.join(self.resultdir, old_log_filename) 780 old_log = open(old_log_path, "a") 781 exceptions = [] 782 for i, pid in enumerate(pids): 783 # wait for the task to finish 784 try: 785 parallel.fork_waitfor(self.resultdir, pid) 786 except Exception, e: 787 exceptions.append(e) 788 # copy the logs from the subtask into the main log 789 new_log_path = old_log_path + (".%d" % i) 790 if os.path.exists(new_log_path): 791 new_log = open(new_log_path) 792 old_log.write(new_log.read()) 793 new_log.close() 794 old_log.flush() 795 os.remove(new_log_path) 796 old_log.close() 797 798 self.log_filename = old_log_filename 799 800 # handle any exceptions raised by the parallel tasks 801 if exceptions: 802 msg = "%d task(s) failed in job.parallel" % len(exceptions) 803 raise error.JobError(msg) 804 805 806 def quit(self): 807 # XXX: should have a better name. 808 self.harness.run_pause() 809 raise error.JobContinue("more to come") 810 811 812 def complete(self, status): 813 """Clean up and exit""" 814 # We are about to exit 'complete' so clean up the control file. 815 dest = os.path.join(self.resultdir, os.path.basename(self.state_file)) 816 os.rename(self.state_file, dest) 817 818 self.harness.run_complete() 819 self.disable_external_logging() 820 sys.exit(status) 821 822 823 def set_state(self, var, val): 824 # Deep copies make sure that the state can't be altered 825 # without it being re-written. Perf wise, deep copies 826 # are overshadowed by pickling/loading. 827 self.state[var] = copy.deepcopy(val) 828 outfile = open(self.state_file, 'w') 829 try: 830 pickle.dump(self.state, outfile, pickle.HIGHEST_PROTOCOL) 831 finally: 832 outfile.close() 833 print "Persistent state variable %s now set to %r" % (var, val) 834 835 836 def _load_state(self): 837 if hasattr(self, 'state'): 838 raise RuntimeError('state already exists') 839 infile = None 840 try: 841 try: 842 infile = open(self.state_file, 'rb') 843 self.state = pickle.load(infile) 844 except IOError: 845 self.state = {} 846 initialize = True 847 finally: 848 if infile: 849 infile.close() 850 851 initialize = '__steps' not in self.state 852 if not (self._is_continuation or initialize): 853 raise RuntimeError('Loaded state must contain __steps or be a ' 854 'continuation.') 855 856 if initialize: 857 print 'Initializing the state engine.' 858 self.set_state('__steps', []) # writes pickle file 859 860 861 def get_state(self, var, default=NO_DEFAULT): 862 if var in self.state or default == NO_DEFAULT: 863 val = self.state[var] 864 else: 865 val = default 866 return copy.deepcopy(val) 867 868 869 def __create_step_tuple(self, fn, args, dargs): 870 # Legacy code passes in an array where the first arg is 871 # the function or its name. 872 if isinstance(fn, list): 873 assert(len(args) == 0) 874 assert(len(dargs) == 0) 875 args = fn[1:] 876 fn = fn[0] 877 # Pickling actual functions is hairy, thus we have to call 878 # them by name. Unfortunately, this means only functions 879 # defined globally can be used as a next step. 880 if callable(fn): 881 fn = fn.__name__ 882 if not isinstance(fn, types.StringTypes): 883 raise StepError("Next steps must be functions or " 884 "strings containing the function name") 885 ancestry = copy.copy(self.current_step_ancestry) 886 return (ancestry, fn, args, dargs) 887 888 889 def next_step_append(self, fn, *args, **dargs): 890 """Define the next step and place it at the end""" 891 steps = self.get_state('__steps') 892 steps.append(self.__create_step_tuple(fn, args, dargs)) 893 self.set_state('__steps', steps) 894 895 896 def next_step(self, fn, *args, **dargs): 897 """Create a new step and place it after any steps added 898 while running the current step but before any steps added in 899 previous steps""" 900 steps = self.get_state('__steps') 901 steps.insert(self.next_step_index, 902 self.__create_step_tuple(fn, args, dargs)) 903 self.next_step_index += 1 904 self.set_state('__steps', steps) 905 906 907 def next_step_prepend(self, fn, *args, **dargs): 908 """Insert a new step, executing first""" 909 steps = self.get_state('__steps') 910 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 911 self.next_step_index += 1 912 self.set_state('__steps', steps) 913 914 915 def _run_step_fn(self, local_vars, fn, args, dargs): 916 """Run a (step) function within the given context""" 917 918 local_vars['__args'] = args 919 local_vars['__dargs'] = dargs 920 try: 921 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 922 return local_vars['__ret'] 923 except SystemExit: 924 raise # Send error.JobContinue and JobComplete on up to runjob. 925 except error.TestNAError, detail: 926 self.record(detail.exit_status, None, fn, str(detail)) 927 except Exception, detail: 928 raise error.UnhandledJobError(detail) 929 930 931 def _create_frame(self, global_vars, ancestry, fn_name): 932 """Set up the environment like it would have been when this 933 function was first defined. 934 935 Child step engine 'implementations' must have 'return locals()' 936 at end end of their steps. Because of this, we can call the 937 parent function and get back all child functions (i.e. those 938 defined within it). 939 940 Unfortunately, the call stack of the function calling 941 job.next_step might have been deeper than the function it 942 added. In order to make sure that the environment is what it 943 should be, we need to then pop off the frames we built until 944 we find the frame where the function was first defined.""" 945 946 # The copies ensure that the parent frames are not modified 947 # while building child frames. This matters if we then 948 # pop some frames in the next part of this function. 949 current_frame = copy.copy(global_vars) 950 frames = [current_frame] 951 for steps_fn_name in ancestry: 952 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 953 current_frame = copy.copy(ret) 954 frames.append(current_frame) 955 956 # Walk up the stack frames until we find the place fn_name was defined. 957 while len(frames) > 2: 958 if fn_name not in frames[-2]: 959 break 960 if frames[-2][fn_name] != frames[-1][fn_name]: 961 break 962 frames.pop() 963 ancestry.pop() 964 965 return (frames[-1], ancestry) 966 967 968 def _add_step_init(self, local_vars, current_function): 969 """If the function returned a dictionary that includes a 970 function named 'step_init', prepend it to our list of steps. 971 This will only get run the first time a function with a nested 972 use of the step engine is run.""" 973 974 if (isinstance(local_vars, dict) and 975 'step_init' in local_vars and 976 callable(local_vars['step_init'])): 977 # The init step is a child of the function 978 # we were just running. 979 self.current_step_ancestry.append(current_function) 980 self.next_step_prepend('step_init') 981 982 983 def step_engine(self): 984 """The multi-run engine used when the control file defines step_init. 985 986 Does the next step. 987 """ 988 989 # Set up the environment and then interpret the control file. 990 # Some control files will have code outside of functions, 991 # which means we need to have our state engine initialized 992 # before reading in the file. 993 global_control_vars = {'job': self} 994 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 995 try: 996 execfile(self.control, global_control_vars, global_control_vars) 997 except error.TestNAError, detail: 998 self.record(detail.exit_status, None, self.control, str(detail)) 999 except SystemExit: 1000 raise # Send error.JobContinue and JobComplete on up to runjob. 1001 except Exception, detail: 1002 # Syntax errors or other general Python exceptions coming out of 1003 # the top level of the control file itself go through here. 1004 raise error.UnhandledJobError(detail) 1005 1006 # If we loaded in a mid-job state file, then we presumably 1007 # know what steps we have yet to run. 1008 if not self._is_continuation: 1009 if 'step_init' in global_control_vars: 1010 self.next_step(global_control_vars['step_init']) 1011 1012 # Iterate through the steps. If we reboot, we'll simply 1013 # continue iterating on the next step. 1014 while len(self.get_state('__steps')) > 0: 1015 steps = self.get_state('__steps') 1016 (ancestry, fn_name, args, dargs) = steps.pop(0) 1017 self.set_state('__steps', steps) 1018 1019 self.next_step_index = 0 1020 ret = self._create_frame(global_control_vars, ancestry, fn_name) 1021 local_vars, self.current_step_ancestry = ret 1022 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 1023 self._add_step_init(local_vars, fn_name) 1024 1025 1026 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1027 self._add_sysinfo_loggable(sysinfo.command(command, logfile), 1028 on_every_test) 1029 1030 1031 def add_sysinfo_logfile(self, file, on_every_test=False): 1032 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1033 1034 1035 def _add_sysinfo_loggable(self, loggable, on_every_test): 1036 if on_every_test: 1037 self.sysinfo.test_loggables.add(loggable) 1038 else: 1039 self.sysinfo.boot_loggables.add(loggable) 1040 self._save_sysinfo_state() 1041 1042 1043 def _load_sysinfo_state(self): 1044 state = self.get_state("__sysinfo", None) 1045 if state: 1046 self.sysinfo.deserialize(state) 1047 1048 1049 def _save_sysinfo_state(self): 1050 state = self.sysinfo.serialize() 1051 self.set_state("__sysinfo", state) 1052 1053 1054 def _init_group_level(self): 1055 self.group_level = self.get_state("__group_level", default=0) 1056 1057 1058 def _increment_group_level(self): 1059 self.group_level += 1 1060 self.set_state("__group_level", self.group_level) 1061 1062 1063 def _decrement_group_level(self): 1064 self.group_level -= 1 1065 self.set_state("__group_level", self.group_level) 1066 1067 1068 def record(self, status_code, subdir, operation, status = '', 1069 optional_fields=None): 1070 """ 1071 Record job-level status 1072 1073 The intent is to make this file both machine parseable and 1074 human readable. That involves a little more complexity, but 1075 really isn't all that bad ;-) 1076 1077 Format is <status code>\t<subdir>\t<operation>\t<status> 1078 1079 status code: (GOOD|WARN|FAIL|ABORT) 1080 or START 1081 or END (GOOD|WARN|FAIL|ABORT) 1082 1083 subdir: MUST be a relevant subdirectory in the results, 1084 or None, which will be represented as '----' 1085 1086 operation: description of what you ran (e.g. "dbench", or 1087 "mkfs -t foobar /dev/sda9") 1088 1089 status: error message or "completed sucessfully" 1090 1091 ------------------------------------------------------------ 1092 1093 Initial tabs indicate indent levels for grouping, and is 1094 governed by self.group_level 1095 1096 multiline messages have secondary lines prefaced by a double 1097 space (' ') 1098 """ 1099 1100 if subdir: 1101 if re.match(r'[\n\t]', subdir): 1102 raise ValueError("Invalid character in subdir string") 1103 substr = subdir 1104 else: 1105 substr = '----' 1106 1107 if not log.is_valid_status(status_code): 1108 raise ValueError("Invalid status code supplied: %s" % status_code) 1109 if not operation: 1110 operation = '----' 1111 1112 if re.match(r'[\n\t]', operation): 1113 raise ValueError("Invalid character in operation string") 1114 operation = operation.rstrip() 1115 1116 if not optional_fields: 1117 optional_fields = {} 1118 1119 status = status.rstrip() 1120 status = re.sub(r"\t", " ", status) 1121 # Ensure any continuation lines are marked so we can 1122 # detect them in the status file to ensure it is parsable. 1123 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 1124 1125 # Generate timestamps for inclusion in the logs 1126 epoch_time = int(time.time()) # seconds since epoch, in UTC 1127 local_time = time.localtime(epoch_time) 1128 optional_fields["timestamp"] = str(epoch_time) 1129 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 1130 local_time) 1131 1132 fields = [status_code, substr, operation] 1133 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 1134 fields.append(status) 1135 1136 msg = '\t'.join(str(x) for x in fields) 1137 msg = '\t' * self.group_level + msg 1138 1139 msg_tag = "" 1140 if "." in self.log_filename: 1141 msg_tag = self.log_filename.split(".", 1)[1] 1142 1143 self.harness.test_status_detail(status_code, substr, operation, status, 1144 msg_tag) 1145 self.harness.test_status(msg, msg_tag) 1146 1147 # log to stdout (if enabled) 1148 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 1149 print msg 1150 1151 # log to the "root" status log 1152 status_file = os.path.join(self.resultdir, self.log_filename) 1153 open(status_file, "a").write(msg + "\n") 1154 1155 # log to the subdir status log (if subdir is set) 1156 if subdir: 1157 dir = os.path.join(self.resultdir, subdir) 1158 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 1159 open(status_file, "a").write(msg + "\n") 1160 1161 1162class disk_usage_monitor: 1163 def __init__(self, logging_func, device, max_mb_per_hour): 1164 self.func = logging_func 1165 self.device = device 1166 self.max_mb_per_hour = max_mb_per_hour 1167 1168 1169 def start(self): 1170 self.initial_space = autotest_utils.freespace(self.device) 1171 self.start_time = time.time() 1172 1173 1174 def stop(self): 1175 # if no maximum usage rate was set, we don't need to 1176 # generate any warnings 1177 if not self.max_mb_per_hour: 1178 return 1179 1180 final_space = autotest_utils.freespace(self.device) 1181 used_space = self.initial_space - final_space 1182 stop_time = time.time() 1183 total_time = stop_time - self.start_time 1184 # round up the time to one minute, to keep extremely short 1185 # tests from generating false positives due to short, badly 1186 # timed bursts of activity 1187 total_time = max(total_time, 60.0) 1188 1189 # determine the usage rate 1190 bytes_per_sec = used_space / total_time 1191 mb_per_sec = bytes_per_sec / 1024**2 1192 mb_per_hour = mb_per_sec * 60 * 60 1193 1194 if mb_per_hour > self.max_mb_per_hour: 1195 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 1196 msg %= (self.device, mb_per_hour) 1197 self.func(msg) 1198 1199 1200 @classmethod 1201 def watch(cls, *monitor_args, **monitor_dargs): 1202 """ Generic decorator to wrap a function call with the 1203 standard create-monitor -> start -> call -> stop idiom.""" 1204 def decorator(func): 1205 def watched_func(*args, **dargs): 1206 monitor = cls(*monitor_args, **monitor_dargs) 1207 monitor.start() 1208 try: 1209 func(*args, **dargs) 1210 finally: 1211 monitor.stop() 1212 return watched_func 1213 return decorator 1214 1215 1216def runjob(control, cont=False, tag="default", harness_type='', 1217 use_external_logging=False): 1218 """ 1219 Run a job using the given control file. 1220 1221 This is the main interface to this module. 1222 1223 Args: 1224 control: The control file to use for this job. 1225 cont: Whether this is the continuation of a previously started job. 1226 tag: The job tag string. ['default'] 1227 harness_type: An alternative server harness. [None] 1228 use_external_logging: Should external logging be enabled? [False] 1229 """ 1230 control = os.path.abspath(control) 1231 state = control + '.state' 1232 1233 # instantiate the job object ready for the control file. 1234 myjob = None 1235 try: 1236 # Check that the control file is valid 1237 if not os.path.exists(control): 1238 raise error.JobError(control + ": control file not found") 1239 1240 # When continuing, the job is complete when there is no 1241 # state file, ensure we don't try and continue. 1242 if cont and not os.path.exists(state): 1243 raise error.JobComplete("all done") 1244 1245 myjob = job(control, tag, cont, harness_type=harness_type, 1246 use_external_logging=use_external_logging) 1247 1248 # Load in the users control file, may do any one of: 1249 # 1) execute in toto 1250 # 2) define steps, and select the first via next_step() 1251 myjob.step_engine() 1252 1253 except error.JobContinue: 1254 sys.exit(5) 1255 1256 except error.JobComplete: 1257 sys.exit(1) 1258 1259 except error.JobError, instance: 1260 print "JOB ERROR: " + instance.args[0] 1261 if myjob: 1262 command = None 1263 if len(instance.args) > 1: 1264 command = instance.args[1] 1265 myjob.record('ABORT', None, command, instance.args[0]) 1266 myjob._decrement_group_level() 1267 myjob.record('END ABORT', None, None, instance.args[0]) 1268 assert (myjob.group_level == 0), ('myjob.group_level must be 0,' 1269 ' not %d' % myjob.group_level) 1270 myjob.complete(1) 1271 else: 1272 sys.exit(1) 1273 1274 except Exception, e: 1275 # NOTE: job._run_step_fn and job.step_engine will turn things into 1276 # a JobError for us. If we get here, its likely an autotest bug. 1277 msg = str(e) + '\n' + traceback.format_exc() 1278 print "JOB ERROR (autotest bug?): " + msg 1279 if myjob: 1280 myjob._decrement_group_level() 1281 myjob.record('END ABORT', None, None, msg) 1282 assert(myjob.group_level == 0) 1283 myjob.complete(1) 1284 else: 1285 sys.exit(1) 1286 1287 # If we get here, then we assume the job is complete and good. 1288 myjob._decrement_group_level() 1289 myjob.record('END GOOD', None, None) 1290 assert(myjob.group_level == 0) 1291 1292 myjob.complete(0) 1293 1294 1295# site_job.py may be non-existant or empty, make sure that an appropriate 1296# site_job class is created nevertheless 1297try: 1298 from autotest_lib.client.bin.site_job import site_job 1299except ImportError: 1300 class site_job(object): 1301 pass 1302 1303class job(site_job, base_job): 1304 pass 1305