job.py revision c1f8cedaa425fa57813e4e8ce6db2b84676816ba
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback, types, copy 10 11# autotest stuff 12from autotest_lib.client.bin import autotest_utils, parallel, kernel, xen 13from autotest_lib.client.bin import profilers, fd_stack, boottool, harness 14from autotest_lib.client.bin import config, sysinfo, cpuset, test, filesystem 15from autotest_lib.client.common_lib import error, barrier, logging, utils 16 17JOB_PREAMBLE = """ 18from autotest_lib.client.common_lib.error import * 19from autotest_lib.client.bin.autotest_utils import * 20""" 21 22class StepError(error.AutotestError): 23 pass 24 25 26class base_job(object): 27 """The actual job against which we do everything. 28 29 Properties: 30 autodir 31 The top level autotest directory (/usr/local/autotest). 32 Comes from os.environ['AUTODIR']. 33 bindir 34 <autodir>/bin/ 35 libdir 36 <autodir>/lib/ 37 testdir 38 <autodir>/tests/ 39 site_testdir 40 <autodir>/site_tests/ 41 profdir 42 <autodir>/profilers/ 43 tmpdir 44 <autodir>/tmp/ 45 resultdir 46 <autodir>/results/<jobtag> 47 stdout 48 fd_stack object for stdout 49 stderr 50 fd_stack object for stderr 51 profilers 52 the profilers object for this job 53 harness 54 the server harness object for this job 55 config 56 the job configuration for this job 57 """ 58 59 DEFAULT_LOG_FILENAME = "status" 60 61 def __init__(self, control, jobtag, cont, harness_type=None, 62 use_external_logging = False): 63 """ 64 control 65 The control file (pathname of) 66 jobtag 67 The job tag string (eg "default") 68 cont 69 If this is the continuation of this job 70 harness_type 71 An alternative server harness 72 """ 73 self.autodir = os.environ['AUTODIR'] 74 self.bindir = os.path.join(self.autodir, 'bin') 75 self.libdir = os.path.join(self.autodir, 'lib') 76 self.testdir = os.path.join(self.autodir, 'tests') 77 self.site_testdir = os.path.join(self.autodir, 'site_tests') 78 self.profdir = os.path.join(self.autodir, 'profilers') 79 self.tmpdir = os.path.join(self.autodir, 'tmp') 80 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 81 self.sysinfodir = os.path.join(self.resultdir, 'sysinfo') 82 self.control = os.path.abspath(control) 83 self.state_file = self.control + '.state' 84 self.current_step_ancestry = [] 85 self.next_step_index = 0 86 self._load_state() 87 88 if not cont: 89 """ 90 Don't cleanup the tmp dir (which contains the lockfile) 91 in the constructor, this would be a problem for multiple 92 jobs starting at the same time on the same client. Instead 93 do the delete at the server side. We simply create the tmp 94 directory here if it does not already exist. 95 """ 96 if not os.path.exists(self.tmpdir): 97 os.mkdir(self.tmpdir) 98 99 results = os.path.join(self.autodir, 'results') 100 if not os.path.exists(results): 101 os.mkdir(results) 102 103 download = os.path.join(self.testdir, 'download') 104 if not os.path.exists(download): 105 os.mkdir(download) 106 107 if os.path.exists(self.resultdir): 108 utils.system('rm -rf ' + self.resultdir) 109 os.mkdir(self.resultdir) 110 os.mkdir(self.sysinfodir) 111 112 os.mkdir(os.path.join(self.resultdir, 'debug')) 113 os.mkdir(os.path.join(self.resultdir, 'analysis')) 114 115 shutil.copyfile(self.control, 116 os.path.join(self.resultdir, 'control')) 117 118 119 self.control = control 120 self.jobtag = jobtag 121 self.log_filename = self.DEFAULT_LOG_FILENAME 122 self.container = None 123 124 self.stdout = fd_stack.fd_stack(1, sys.stdout) 125 self.stderr = fd_stack.fd_stack(2, sys.stderr) 126 127 self._init_group_level() 128 129 self.config = config.config(self) 130 self.harness = harness.select(harness_type, self) 131 self.profilers = profilers.profilers(self) 132 133 try: 134 tool = self.config_get('boottool.executable') 135 self.bootloader = boottool.boottool(tool) 136 except: 137 pass 138 139 sysinfo.log_per_reboot_data(self.sysinfodir) 140 141 if not cont: 142 self.record('START', None, None) 143 self._increment_group_level() 144 145 self.harness.run_start() 146 147 if use_external_logging: 148 self.enable_external_logging() 149 150 # load the max disk usage rate - default to no monitoring 151 self.max_disk_usage_rate = self.get_state('__monitor_disk', default=0.0) 152 153 154 def monitor_disk_usage(self, max_rate): 155 """\ 156 Signal that the job should monitor disk space usage on / 157 and generate a warning if a test uses up disk space at a 158 rate exceeding 'max_rate'. 159 160 Parameters: 161 max_rate - the maximium allowed rate of disk consumption 162 during a test, in MB/hour, or 0 to indicate 163 no limit. 164 """ 165 self.set_state('__monitor_disk', max_rate) 166 self.max_disk_usage_rate = max_rate 167 168 169 def relative_path(self, path): 170 """\ 171 Return a patch relative to the job results directory 172 """ 173 head = len(self.resultdir) + 1 # remove the / inbetween 174 return path[head:] 175 176 177 def control_get(self): 178 return self.control 179 180 181 def control_set(self, control): 182 self.control = os.path.abspath(control) 183 184 185 def harness_select(self, which): 186 self.harness = harness.select(which, self) 187 188 189 def config_set(self, name, value): 190 self.config.set(name, value) 191 192 193 def config_get(self, name): 194 return self.config.get(name) 195 196 197 def setup_dirs(self, results_dir, tmp_dir): 198 if not tmp_dir: 199 tmp_dir = os.path.join(self.tmpdir, 'build') 200 if not os.path.exists(tmp_dir): 201 os.mkdir(tmp_dir) 202 if not os.path.isdir(tmp_dir): 203 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 204 raise ValueError(e_msg) 205 206 # We label the first build "build" and then subsequent ones 207 # as "build.2", "build.3", etc. Whilst this is a little bit 208 # inconsistent, 99.9% of jobs will only have one build 209 # (that's not done as kernbench, sparse, or buildtest), 210 # so it works out much cleaner. One of life's comprimises. 211 if not results_dir: 212 results_dir = os.path.join(self.resultdir, 'build') 213 i = 2 214 while os.path.exists(results_dir): 215 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 216 i += 1 217 if not os.path.exists(results_dir): 218 os.mkdir(results_dir) 219 220 return (results_dir, tmp_dir) 221 222 223 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 224 kjob = None ): 225 """Summon a xen object""" 226 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 227 build_dir = 'xen' 228 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 229 230 231 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 232 """Summon a kernel object""" 233 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 234 build_dir = 'linux' 235 return kernel.auto_kernel(self, base_tree, results_dir, tmp_dir, 236 build_dir, leave) 237 238 239 def barrier(self, *args, **kwds): 240 """Create a barrier object""" 241 return barrier.barrier(*args, **kwds) 242 243 244 def setup_dep(self, deps): 245 """Set up the dependencies for this test. 246 247 deps is a list of libraries required for this test. 248 """ 249 for dep in deps: 250 try: 251 os.chdir(os.path.join(self.autodir, 'deps', dep)) 252 utils.system('./' + dep + '.py') 253 except: 254 err = "setting up dependency " + dep + "\n" 255 raise error.UnhandledError(err) 256 257 258 def _runtest(self, url, tag, args, dargs): 259 try: 260 l = lambda : test.runtest(self, url, tag, args, dargs) 261 pid = parallel.fork_start(self.resultdir, l) 262 parallel.fork_waitfor(self.resultdir, pid) 263 except error.AutotestError: 264 raise 265 except Exception, e: 266 msg = "Unhandled %s error occured during test\n" 267 msg %= str(e.__class__.__name__) 268 raise error.UnhandledError(msg) 269 270 271 def run_test(self, url, *args, **dargs): 272 """Summon a test object and run it. 273 274 tag 275 tag to add to testname 276 url 277 url of the test to run 278 """ 279 280 if not url: 281 raise TypeError("Test name is invalid. " 282 "Switched arguments?") 283 (group, testname) = test.testname(url) 284 namelen = len(testname) 285 dargs = dargs.copy() 286 tntag = dargs.pop('tag', None) 287 if tntag: # testname tag is included in reported test name 288 testname += '.' + tntag 289 subdir = testname 290 sdtag = dargs.pop('subdir_tag', None) 291 if sdtag: # subdir-only tag is not included in reports 292 subdir = subdir + '.' + sdtag 293 tag = subdir[namelen+1:] # '' if none 294 295 outputdir = os.path.join(self.resultdir, subdir) 296 if os.path.exists(outputdir): 297 msg = ("%s already exists, test <%s> may have" 298 " already run with tag <%s>" 299 % (outputdir, testname, tag) ) 300 raise error.TestError(msg) 301 os.mkdir(outputdir) 302 303 container = dargs.pop('container', None) 304 if container: 305 cname = container.get('name', None) 306 if not cname: # get old name 307 cname = container.get('container_name', None) 308 mbytes = container.get('mbytes', None) 309 if not mbytes: # get old name 310 mbytes = container.get('mem', None) 311 cpus = container.get('cpus', None) 312 if not cpus: # get old name 313 cpus = container.get('cpu', None) 314 root = container.get('root', None) 315 self.new_container(mbytes=mbytes, cpus=cpus, 316 root=root, name=cname) 317 # We are running in a container now... 318 319 def log_warning(reason): 320 self.record("WARN", subdir, testname, reason) 321 @disk_usage_monitor.watch(log_warning, "/", self.max_disk_usage_rate) 322 def group_func(): 323 try: 324 self._runtest(url, tag, args, dargs) 325 except error.TestNAError, detail: 326 self.record('TEST_NA', subdir, testname, 327 str(detail)) 328 raise 329 except Exception, detail: 330 self.record('FAIL', subdir, testname, 331 str(detail)) 332 raise 333 else: 334 self.record('GOOD', subdir, testname, 335 'completed successfully') 336 337 result, exc_info = self._rungroup(subdir, testname, group_func) 338 if container: 339 self.release_container() 340 if exc_info and isinstance(exc_info[1], error.TestError): 341 return False 342 elif exc_info: 343 raise exc_info[0], exc_info[1], exc_info[2] 344 else: 345 return True 346 347 348 def _rungroup(self, subdir, testname, function, *args, **dargs): 349 """\ 350 subdir: 351 name of the group 352 testname: 353 name of the test to run, or support step 354 function: 355 subroutine to run 356 *args: 357 arguments for the function 358 359 Returns a 2-tuple (result, exc_info) where result 360 is the return value of function, and exc_info is 361 the sys.exc_info() of the exception thrown by the 362 function (which may be None). 363 """ 364 365 result, exc_info = None, None 366 try: 367 self.record('START', subdir, testname) 368 self._increment_group_level() 369 result = function(*args, **dargs) 370 self._decrement_group_level() 371 self.record('END GOOD', subdir, testname) 372 except error.TestNAError, e: 373 self._decrement_group_level() 374 self.record('END TEST_NA', subdir, testname, str(e)) 375 except Exception, e: 376 exc_info = sys.exc_info() 377 self._decrement_group_level() 378 err_msg = str(e) + '\n' + traceback.format_exc() 379 self.record('END FAIL', subdir, testname, err_msg) 380 381 return result, exc_info 382 383 384 def run_group(self, function, *args, **dargs): 385 """\ 386 function: 387 subroutine to run 388 *args: 389 arguments for the function 390 """ 391 392 # Allow the tag for the group to be specified 393 name = function.__name__ 394 tag = dargs.pop('tag', None) 395 if tag: 396 name = tag 397 398 outputdir = os.path.join(self.resultdir, name) 399 if os.path.exists(outputdir): 400 msg = ("%s already exists, test <%s> may have" 401 " already run with tag <%s>" 402 % (outputdir, name, name) ) 403 raise error.TestError(msg) 404 os.mkdir(outputdir) 405 406 result, exc_info = self._rungroup(name, name, function, *args, **dargs) 407 408 # if there was a non-TestError exception, raise it 409 if exc_info and not isinstance(exc_info[1], error.TestError): 410 err = ''.join(traceback.format_exception(*exc_info)) 411 raise error.TestError(name + ' failed\n' + err) 412 413 # pass back the actual return value from the function 414 return result 415 416 417 def new_container(self, mbytes=None, cpus=None, root=None, name=None): 418 if not autotest_utils.grep('cpuset', '/proc/filesystems'): 419 print "Containers not enabled by latest reboot" 420 return # containers weren't enabled in this kernel boot 421 pid = os.getpid() 422 if not name: 423 name = 'test%d' % pid # make arbitrary unique name 424 self.container = cpuset.cpuset(name, job_size=mbytes, job_pid=pid, 425 cpus=cpus, root=root) 426 # This job's python shell is now running in the new container 427 # and all forked test processes will inherit that container 428 429 430 def release_container(self): 431 if self.container: 432 self.container.release() 433 self.container = None 434 435 436 def cpu_count(self): 437 if self.container: 438 return len(self.container.cpus) 439 return autotest_utils.count_cpus() # use total system count 440 441 442 # Check the passed kernel identifier against the command line 443 # and the running kernel, abort the job on missmatch. 444 def kernel_check_ident(self, expected_when, expected_id, subdir, 445 type = 'src', patches=[]): 446 print (("POST BOOT: checking booted kernel " + 447 "mark=%d identity='%s' type='%s'") % 448 (expected_when, expected_id, type)) 449 450 running_id = autotest_utils.running_os_ident() 451 452 cmdline = utils.read_one_line("/proc/cmdline") 453 454 find_sum = re.compile(r'.*IDENT=(\d+)') 455 m = find_sum.match(cmdline) 456 cmdline_when = -1 457 if m: 458 cmdline_when = int(m.groups()[0]) 459 460 # We have all the facts, see if they indicate we 461 # booted the requested kernel or not. 462 bad = False 463 if (type == 'src' and expected_id != running_id or 464 type == 'rpm' and 465 not running_id.startswith(expected_id + '::')): 466 print "check_kernel_ident: kernel identifier mismatch" 467 bad = True 468 if expected_when != cmdline_when: 469 print "check_kernel_ident: kernel command line mismatch" 470 bad = True 471 472 if bad: 473 print " Expected Ident: " + expected_id 474 print " Running Ident: " + running_id 475 print " Expected Mark: %d" % (expected_when) 476 print "Command Line Mark: %d" % (cmdline_when) 477 print " Command Line: " + cmdline 478 479 raise error.JobError("boot failure", "reboot.verify") 480 481 kernel_info = {'kernel': expected_id} 482 for i, patch in enumerate(patches): 483 kernel_info["patch%d" % i] = patch 484 self.record('GOOD', subdir, 'reboot.verify', expected_id) 485 self._decrement_group_level() 486 self.record('END GOOD', subdir, 'reboot', 487 optional_fields=kernel_info) 488 489 490 def filesystem(self, device, mountpoint = None, loop_size = 0): 491 if not mountpoint: 492 mountpoint = self.tmpdir 493 return filesystem.filesystem(self, device, mountpoint,loop_size) 494 495 496 def enable_external_logging(self): 497 pass 498 499 500 def disable_external_logging(self): 501 pass 502 503 504 def reboot_setup(self): 505 pass 506 507 508 def reboot(self, tag='autotest'): 509 self.reboot_setup() 510 self.record('START', None, 'reboot') 511 self._increment_group_level() 512 self.record('GOOD', None, 'reboot.start') 513 self.harness.run_reboot() 514 default = self.config_get('boot.set_default') 515 if default: 516 self.bootloader.set_default(tag) 517 else: 518 self.bootloader.boot_once(tag) 519 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 520 utils.system(cmd) 521 self.quit() 522 523 524 def noop(self, text): 525 print "job: noop: " + text 526 527 528 def parallel(self, *tasklist): 529 """Run tasks in parallel""" 530 531 pids = [] 532 old_log_filename = self.log_filename 533 for i, task in enumerate(tasklist): 534 self.log_filename = old_log_filename + (".%d" % i) 535 task_func = lambda: task[0](*task[1:]) 536 pids.append(parallel.fork_start(self.resultdir, task_func)) 537 538 old_log_path = os.path.join(self.resultdir, old_log_filename) 539 old_log = open(old_log_path, "a") 540 exceptions = [] 541 for i, pid in enumerate(pids): 542 # wait for the task to finish 543 try: 544 parallel.fork_waitfor(self.resultdir, pid) 545 except Exception, e: 546 exceptions.append(e) 547 # copy the logs from the subtask into the main log 548 new_log_path = old_log_path + (".%d" % i) 549 if os.path.exists(new_log_path): 550 new_log = open(new_log_path) 551 old_log.write(new_log.read()) 552 new_log.close() 553 old_log.flush() 554 os.remove(new_log_path) 555 old_log.close() 556 557 self.log_filename = old_log_filename 558 559 # handle any exceptions raised by the parallel tasks 560 if exceptions: 561 msg = "%d task(s) failed" % len(exceptions) 562 raise error.JobError(msg, str(exceptions), exceptions) 563 564 565 def quit(self): 566 # XXX: should have a better name. 567 self.harness.run_pause() 568 raise error.JobContinue("more to come") 569 570 571 def complete(self, status): 572 """Clean up and exit""" 573 # We are about to exit 'complete' so clean up the control file. 574 try: 575 os.unlink(self.state_file) 576 except: 577 pass 578 579 self.harness.run_complete() 580 self.disable_external_logging() 581 sys.exit(status) 582 583 584 def set_state(self, var, val): 585 # Deep copies make sure that the state can't be altered 586 # without it being re-written. Perf wise, deep copies 587 # are overshadowed by pickling/loading. 588 self.state[var] = copy.deepcopy(val) 589 pickle.dump(self.state, open(self.state_file, 'w')) 590 591 592 def _load_state(self): 593 assert not hasattr(self, "state") 594 try: 595 self.state = pickle.load(open(self.state_file, 'r')) 596 self.state_existed = True 597 except Exception: 598 print "Initializing the state engine." 599 self.state = {} 600 self.set_state('__steps', []) # writes pickle file 601 self.state_existed = False 602 603 604 def get_state(self, var, default=None): 605 if var in self.state or default == None: 606 val = self.state[var] 607 else: 608 val = default 609 return copy.deepcopy(val) 610 611 612 def __create_step_tuple(self, fn, args, dargs): 613 # Legacy code passes in an array where the first arg is 614 # the function or its name. 615 if isinstance(fn, list): 616 assert(len(args) == 0) 617 assert(len(dargs) == 0) 618 args = fn[1:] 619 fn = fn[0] 620 # Pickling actual functions is harry, thus we have to call 621 # them by name. Unfortunately, this means only functions 622 # defined globally can be used as a next step. 623 if callable(fn): 624 fn = fn.__name__ 625 if not isinstance(fn, types.StringTypes): 626 raise StepError("Next steps must be functions or " 627 "strings containing the function name") 628 ancestry = copy.copy(self.current_step_ancestry) 629 return (ancestry, fn, args, dargs) 630 631 632 def next_step_append(self, fn, *args, **dargs): 633 """Define the next step and place it at the end""" 634 steps = self.get_state('__steps') 635 steps.append(self.__create_step_tuple(fn, args, dargs)) 636 self.set_state('__steps', steps) 637 638 639 def next_step(self, fn, *args, **dargs): 640 """Create a new step and place it after any steps added 641 while running the current step but before any steps added in 642 previous steps""" 643 steps = self.get_state('__steps') 644 steps.insert(self.next_step_index, 645 self.__create_step_tuple(fn, args, dargs)) 646 self.next_step_index += 1 647 self.set_state('__steps', steps) 648 649 650 def next_step_prepend(self, fn, *args, **dargs): 651 """Insert a new step, executing first""" 652 steps = self.get_state('__steps') 653 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 654 self.next_step_index += 1 655 self.set_state('__steps', steps) 656 657 658 def _run_step_fn(self, local_vars, fn, args, dargs): 659 """Run a (step) function within the given context""" 660 661 local_vars['__args'] = args 662 local_vars['__dargs'] = dargs 663 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars) 664 return local_vars['__ret'] 665 666 667 def _create_frame(self, global_vars, ancestry, fn_name): 668 """Set up the environment like it would have been when this 669 function was first defined. 670 671 Child step engine 'implementations' must have 'return locals()' 672 at end end of their steps. Because of this, we can call the 673 parent function and get back all child functions (i.e. those 674 defined within it). 675 676 Unfortunately, the call stack of the function calling 677 job.next_step might have been deeper than the function it 678 added. In order to make sure that the environment is what it 679 should be, we need to then pop off the frames we built until 680 we find the frame where the function was first defined.""" 681 682 # The copies ensure that the parent frames are not modified 683 # while building child frames. This matters if we then 684 # pop some frames in the next part of this function. 685 current_frame = copy.copy(global_vars) 686 frames = [current_frame] 687 for steps_fn_name in ancestry: 688 ret = self._run_step_fn(current_frame, steps_fn_name, [], {}) 689 current_frame = copy.copy(ret) 690 frames.append(current_frame) 691 692 while len(frames) > 2: 693 if fn_name not in frames[-2]: 694 break 695 if frames[-2][fn_name] != frames[-1][fn_name]: 696 break 697 frames.pop() 698 ancestry.pop() 699 700 return (frames[-1], ancestry) 701 702 703 def _add_step_init(self, local_vars, current_function): 704 """If the function returned a dictionary that includes a 705 function named 'step_init', prepend it to our list of steps. 706 This will only get run the first time a function with a nested 707 use of the step engine is run.""" 708 709 if (isinstance(local_vars, dict) and 710 'step_init' in local_vars and 711 callable(local_vars['step_init'])): 712 # The init step is a child of the function 713 # we were just running. 714 self.current_step_ancestry.append(current_function) 715 self.next_step_prepend('step_init') 716 717 718 def step_engine(self): 719 """the stepping engine -- if the control file defines 720 step_init we will be using this engine to drive multiple runs. 721 """ 722 """Do the next step""" 723 724 # Set up the environment and then interpret the control file. 725 # Some control files will have code outside of functions, 726 # which means we need to have our state engine initialized 727 # before reading in the file. 728 global_control_vars = {'job': self} 729 exec(JOB_PREAMBLE, global_control_vars, global_control_vars) 730 execfile(self.control, global_control_vars, global_control_vars) 731 732 # If we loaded in a mid-job state file, then we presumably 733 # know what steps we have yet to run. 734 if not self.state_existed: 735 if global_control_vars.has_key('step_init'): 736 self.next_step(global_control_vars['step_init']) 737 738 # Iterate through the steps. If we reboot, we'll simply 739 # continue iterating on the next step. 740 while len(self.get_state('__steps')) > 0: 741 steps = self.get_state('__steps') 742 (ancestry, fn_name, args, dargs) = steps.pop(0) 743 self.set_state('__steps', steps) 744 745 self.next_step_index = 0 746 ret = self._create_frame(global_control_vars, ancestry, fn_name) 747 local_vars, self.current_step_ancestry = ret 748 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs) 749 self._add_step_init(local_vars, fn_name) 750 751 752 def _init_group_level(self): 753 self.group_level = self.get_state("__group_level", default=0) 754 755 756 def _increment_group_level(self): 757 self.group_level += 1 758 self.set_state("__group_level", self.group_level) 759 760 761 def _decrement_group_level(self): 762 self.group_level -= 1 763 self.set_state("__group_level", self.group_level) 764 765 766 def record(self, status_code, subdir, operation, status = '', 767 optional_fields=None): 768 """ 769 Record job-level status 770 771 The intent is to make this file both machine parseable and 772 human readable. That involves a little more complexity, but 773 really isn't all that bad ;-) 774 775 Format is <status code>\t<subdir>\t<operation>\t<status> 776 777 status code: (GOOD|WARN|FAIL|ABORT) 778 or START 779 or END (GOOD|WARN|FAIL|ABORT) 780 781 subdir: MUST be a relevant subdirectory in the results, 782 or None, which will be represented as '----' 783 784 operation: description of what you ran (e.g. "dbench", or 785 "mkfs -t foobar /dev/sda9") 786 787 status: error message or "completed sucessfully" 788 789 ------------------------------------------------------------ 790 791 Initial tabs indicate indent levels for grouping, and is 792 governed by self.group_level 793 794 multiline messages have secondary lines prefaced by a double 795 space (' ') 796 """ 797 798 if subdir: 799 if re.match(r'[\n\t]', subdir): 800 raise ValueError("Invalid character in subdir string") 801 substr = subdir 802 else: 803 substr = '----' 804 805 if not logging.is_valid_status(status_code): 806 raise ValueError("Invalid status code supplied: %s" % status_code) 807 if not operation: 808 operation = '----' 809 810 if re.match(r'[\n\t]', operation): 811 raise ValueError("Invalid character in operation string") 812 operation = operation.rstrip() 813 814 if not optional_fields: 815 optional_fields = {} 816 817 status = status.rstrip() 818 status = re.sub(r"\t", " ", status) 819 # Ensure any continuation lines are marked so we can 820 # detect them in the status file to ensure it is parsable. 821 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 822 823 # Generate timestamps for inclusion in the logs 824 epoch_time = int(time.time()) # seconds since epoch, in UTC 825 local_time = time.localtime(epoch_time) 826 optional_fields["timestamp"] = str(epoch_time) 827 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 828 local_time) 829 830 fields = [status_code, substr, operation] 831 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 832 fields.append(status) 833 834 msg = '\t'.join(str(x) for x in fields) 835 msg = '\t' * self.group_level + msg 836 837 msg_tag = "" 838 if "." in self.log_filename: 839 msg_tag = self.log_filename.split(".", 1)[1] 840 841 self.harness.test_status_detail(status_code, substr, operation, status, 842 msg_tag) 843 self.harness.test_status(msg, msg_tag) 844 845 # log to stdout (if enabled) 846 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 847 print msg 848 849 # log to the "root" status log 850 status_file = os.path.join(self.resultdir, self.log_filename) 851 open(status_file, "a").write(msg + "\n") 852 853 # log to the subdir status log (if subdir is set) 854 if subdir: 855 dir = os.path.join(self.resultdir, subdir) 856 status_file = os.path.join(dir, self.DEFAULT_LOG_FILENAME) 857 open(status_file, "a").write(msg + "\n") 858 859 860class disk_usage_monitor: 861 def __init__(self, logging_func, device, max_mb_per_hour): 862 self.func = logging_func 863 self.device = device 864 self.max_mb_per_hour = max_mb_per_hour 865 866 867 def start(self): 868 self.initial_space = autotest_utils.freespace(self.device) 869 self.start_time = time.time() 870 871 872 def stop(self): 873 # if no maximum usage rate was set, we don't need to 874 # generate any warnings 875 if not self.max_mb_per_hour: 876 return 877 878 final_space = autotest_utils.freespace(self.device) 879 used_space = self.initial_space - final_space 880 stop_time = time.time() 881 total_time = stop_time - self.start_time 882 # round up the time to one minute, to keep extremely short 883 # tests from generating false positives due to short, badly 884 # timed bursts of activity 885 total_time = max(total_time, 60.0) 886 887 # determine the usage rate 888 bytes_per_sec = used_space / total_time 889 mb_per_sec = bytes_per_sec / 1024**2 890 mb_per_hour = mb_per_sec * 60 * 60 891 892 if mb_per_hour > self.max_mb_per_hour: 893 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour") 894 msg %= (self.device, mb_per_hour) 895 self.func(msg) 896 897 898 @classmethod 899 def watch(cls, *monitor_args, **monitor_dargs): 900 """ Generic decorator to wrap a function call with the 901 standard create-monitor -> start -> call -> stop idiom.""" 902 def decorator(func): 903 def watched_func(*args, **dargs): 904 monitor = cls(*monitor_args, **monitor_dargs) 905 monitor.start() 906 try: 907 func(*args, **dargs) 908 finally: 909 monitor.stop() 910 return watched_func 911 return decorator 912 913 914def runjob(control, cont = False, tag = "default", harness_type = '', 915 use_external_logging = False): 916 """The main interface to this module 917 918 control 919 The control file to use for this job. 920 cont 921 Whether this is the continuation of a previously started job 922 """ 923 control = os.path.abspath(control) 924 state = control + '.state' 925 926 # instantiate the job object ready for the control file. 927 myjob = None 928 try: 929 # Check that the control file is valid 930 if not os.path.exists(control): 931 raise error.JobError(control + ": control file not found") 932 933 # When continuing, the job is complete when there is no 934 # state file, ensure we don't try and continue. 935 if cont and not os.path.exists(state): 936 raise error.JobComplete("all done") 937 if cont == False and os.path.exists(state): 938 os.unlink(state) 939 940 myjob = job(control, tag, cont, harness_type, use_external_logging) 941 942 # Load in the users control file, may do any one of: 943 # 1) execute in toto 944 # 2) define steps, and select the first via next_step() 945 myjob.step_engine() 946 947 except error.JobContinue: 948 sys.exit(5) 949 950 except error.JobComplete: 951 sys.exit(1) 952 953 except error.JobError, instance: 954 print "JOB ERROR: " + instance.args[0] 955 if myjob: 956 command = None 957 if len(instance.args) > 1: 958 command = instance.args[1] 959 myjob.record('ABORT', None, command, instance.args[0]) 960 myjob._decrement_group_level() 961 myjob.record('END ABORT', None, None) 962 assert(myjob.group_level == 0) 963 myjob.complete(1) 964 else: 965 sys.exit(1) 966 967 except Exception, e: 968 msg = str(e) + '\n' + traceback.format_exc() 969 print "JOB ERROR: " + msg 970 if myjob: 971 myjob.record('ABORT', None, None, msg) 972 myjob._decrement_group_level() 973 myjob.record('END ABORT', None, None) 974 assert(myjob.group_level == 0) 975 myjob.complete(1) 976 else: 977 sys.exit(1) 978 979 # If we get here, then we assume the job is complete and good. 980 myjob._decrement_group_level() 981 myjob.record('END GOOD', None, None) 982 assert(myjob.group_level == 0) 983 984 myjob.complete(0) 985 986 987# site_job.py may be non-existant or empty, make sure that an appropriate 988# site_job class is created nevertheless 989try: 990 from site_job import site_job 991except ImportError: 992 class site_job(base_job): 993 pass 994 995class job(site_job): 996 pass 997