job.py revision 87cbc7fe4f0fef49c74c9647cc42c50088622449
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback, types, copy 10 11# autotest stuff 12from autotest_lib.client.bin import autotest_utils 13from autotest_lib.client.common_lib import error, barrier, logging 14 15import parallel, kernel, xen, test, profilers, filesystem, fd_stack, boottool 16import harness, config, sysinfo, cpuset 17 18 19 20JOB_PREAMBLE = """ 21from common.error import * 22from autotest_utils import * 23""" 24 25 26class StepError(error.AutotestError): 27 pass 28 29 30class base_job: 31 """The actual job against which we do everything. 32 33 Properties: 34 autodir 35 The top level autotest directory (/usr/local/autotest). 36 Comes from os.environ['AUTODIR']. 37 bindir 38 <autodir>/bin/ 39 libdir 40 <autodir>/lib/ 41 testdir 42 <autodir>/tests/ 43 site_testdir 44 <autodir>/site_tests/ 45 profdir 46 <autodir>/profilers/ 47 tmpdir 48 <autodir>/tmp/ 49 resultdir 50 <autodir>/results/<jobtag> 51 stdout 52 fd_stack object for stdout 53 stderr 54 fd_stack object for stderr 55 profilers 56 the profilers object for this job 57 harness 58 the server harness object for this job 59 config 60 the job configuration for this job 61 """ 62 63 DEFAULT_LOG_FILENAME = "status" 64 65 def __init__(self, control, jobtag, cont, harness_type=None, 66 use_external_logging = False): 67 """ 68 control 69 The control file (pathname of) 70 jobtag 71 The job tag string (eg "default") 72 cont 73 If this is the continuation of this job 74 harness_type 75 An alternative server harness 76 """ 77 self.autodir = os.environ['AUTODIR'] 78 self.bindir = os.path.join(self.autodir, 'bin') 79 self.libdir = os.path.join(self.autodir, 'lib') 80 self.testdir = os.path.join(self.autodir, 'tests') 81 self.site_testdir = os.path.join(self.autodir, 'site_tests') 82 self.profdir = os.path.join(self.autodir, 'profilers') 83 self.tmpdir = os.path.join(self.autodir, 'tmp') 84 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 85 self.sysinfodir = os.path.join(self.resultdir, 'sysinfo') 86 self.control = os.path.abspath(control) 87 self.state_file = self.control + '.state' 88 self.__load_state() 89 90 if not cont: 91 if os.path.exists(self.tmpdir): 92 cmd = ('umount -f %s > /dev/null 2> /dev/null' 93 % (self.tmpdir)) 94 autotest_utils.system(cmd, ignore_status=True) 95 autotest_utils.system('rm -rf ' + self.tmpdir) 96 os.mkdir(self.tmpdir) 97 98 results = os.path.join(self.autodir, 'results') 99 if not os.path.exists(results): 100 os.mkdir(results) 101 102 download = os.path.join(self.testdir, 'download') 103 if os.path.exists(download): 104 autotest_utils.system('rm -rf ' + download) 105 os.mkdir(download) 106 107 if os.path.exists(self.resultdir): 108 autotest_utils.system('rm -rf ' 109 + self.resultdir) 110 os.mkdir(self.resultdir) 111 os.mkdir(self.sysinfodir) 112 113 os.mkdir(os.path.join(self.resultdir, 'debug')) 114 os.mkdir(os.path.join(self.resultdir, 'analysis')) 115 116 shutil.copyfile(self.control, 117 os.path.join(self.resultdir, 'control')) 118 119 120 self.control = control 121 self.jobtag = jobtag 122 self.log_filename = self.DEFAULT_LOG_FILENAME 123 self.container = None 124 125 self.stdout = fd_stack.fd_stack(1, sys.stdout) 126 self.stderr = fd_stack.fd_stack(2, sys.stderr) 127 128 self._init_group_level() 129 130 self.config = config.config(self) 131 132 self.harness = harness.select(harness_type, self) 133 134 self.profilers = profilers.profilers(self) 135 136 try: 137 tool = self.config_get('boottool.executable') 138 self.bootloader = boottool.boottool(tool) 139 except: 140 pass 141 142 sysinfo.log_per_reboot_data(self.sysinfodir) 143 144 if not cont: 145 self.record('START', None, None) 146 self._increment_group_level() 147 148 self.harness.run_start() 149 150 if use_external_logging: 151 self.enable_external_logging() 152 153 # load the max disk usage rate - default to no monitoring 154 self.max_disk_usage_rate = self.get_state('__monitor_disk', 155 default=0.0) 156 157 158 def monitor_disk_usage(self, max_rate): 159 """\ 160 Signal that the job should monitor disk space usage on / 161 and generate a warning if a test uses up disk space at a 162 rate exceeding 'max_rate'. 163 164 Parameters: 165 max_rate - the maximium allowed rate of disk consumption 166 during a test, in MB/hour, or 0 to indicate 167 no limit. 168 """ 169 self.set_state('__monitor_disk', max_rate) 170 self.max_disk_usage_rate = max_rate 171 172 173 def relative_path(self, path): 174 """\ 175 Return a patch relative to the job results directory 176 """ 177 head = len(self.resultdir) + 1 # remove the / inbetween 178 return path[head:] 179 180 181 def control_get(self): 182 return self.control 183 184 185 def control_set(self, control): 186 self.control = os.path.abspath(control) 187 188 189 def harness_select(self, which): 190 self.harness = harness.select(which, self) 191 192 193 def config_set(self, name, value): 194 self.config.set(name, value) 195 196 197 def config_get(self, name): 198 return self.config.get(name) 199 200 def setup_dirs(self, results_dir, tmp_dir): 201 if not tmp_dir: 202 tmp_dir = os.path.join(self.tmpdir, 'build') 203 if not os.path.exists(tmp_dir): 204 os.mkdir(tmp_dir) 205 if not os.path.isdir(tmp_dir): 206 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 207 raise ValueError(e_msg) 208 209 # We label the first build "build" and then subsequent ones 210 # as "build.2", "build.3", etc. Whilst this is a little bit 211 # inconsistent, 99.9% of jobs will only have one build 212 # (that's not done as kernbench, sparse, or buildtest), 213 # so it works out much cleaner. One of life's comprimises. 214 if not results_dir: 215 results_dir = os.path.join(self.resultdir, 'build') 216 i = 2 217 while os.path.exists(results_dir): 218 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 219 i += 1 220 if not os.path.exists(results_dir): 221 os.mkdir(results_dir) 222 223 return (results_dir, tmp_dir) 224 225 226 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 227 kjob = None ): 228 """Summon a xen object""" 229 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 230 build_dir = 'xen' 231 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 232 233 234 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 235 """Summon a kernel object""" 236 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 237 build_dir = 'linux' 238 return kernel.auto_kernel(self, base_tree, results_dir, 239 tmp_dir, build_dir, leave) 240 241 242 def barrier(self, *args, **kwds): 243 """Create a barrier object""" 244 return barrier.barrier(*args, **kwds) 245 246 247 def setup_dep(self, deps): 248 """Set up the dependencies for this test. 249 250 deps is a list of libraries required for this test. 251 """ 252 for dep in deps: 253 try: 254 os.chdir(os.path.join(self.autodir, 'deps', dep)) 255 autotest_utils.system('./' + dep + '.py') 256 except: 257 err = "setting up dependency " + dep + "\n" 258 raise error.UnhandledError(err) 259 260 261 def __runtest(self, url, tag, args, dargs): 262 try: 263 l = lambda : test.runtest(self, url, tag, args, dargs) 264 pid = parallel.fork_start(self.resultdir, l) 265 parallel.fork_waitfor(self.resultdir, pid) 266 except error.AutotestError: 267 raise 268 except: 269 raise error.UnhandledError('running test ' + \ 270 self.__class__.__name__ + "\n") 271 272 273 def run_test(self, url, *args, **dargs): 274 """Summon a test object and run it. 275 276 tag 277 tag to add to testname 278 url 279 url of the test to run 280 """ 281 282 if not url: 283 raise TypeError("Test name is invalid. " 284 "Switched arguments?") 285 (group, testname) = test.testname(url) 286 tag = dargs.pop('tag', None) 287 container = dargs.pop('container', None) 288 subdir = testname 289 if tag: 290 subdir += '.' + tag 291 292 if container: 293 cname = container.get('name', None) 294 if not cname: # get old name 295 cname = container.get('container_name', None) 296 mbytes = container.get('mbytes', None) 297 if not mbytes: # get old name 298 mbytes = container.get('mem', None) 299 cpus = container.get('cpus', None) 300 if not cpus: # get old name 301 cpus = container.get('cpu', None) 302 root = container.get('root', None) 303 self.new_container(mbytes=mbytes, cpus=cpus, 304 root=root, name=cname) 305 # We are running in a container now... 306 307 def log_warning(reason): 308 self.record("WARN", subdir, testname, reason) 309 @disk_usage_monitor.watch(log_warning, "/", 310 self.max_disk_usage_rate) 311 def group_func(): 312 try: 313 self.__runtest(url, tag, args, dargs) 314 except error.TestNAError, detail: 315 self.record('TEST_NA', subdir, testname, 316 str(detail)) 317 raise 318 except Exception, detail: 319 self.record('FAIL', subdir, testname, 320 str(detail)) 321 raise 322 else: 323 self.record('GOOD', subdir, testname, 324 'completed successfully') 325 326 result, exc_info = self.__rungroup(subdir, group_func) 327 if container: 328 self.release_container() 329 if exc_info and isinstance(exc_info[1], error.TestError): 330 return False 331 elif exc_info: 332 raise exc_info[0], exc_info[1], exc_info[2] 333 else: 334 return True 335 336 337 def __rungroup(self, name, function, *args, **dargs): 338 """\ 339 name: 340 name of the group 341 function: 342 subroutine to run 343 *args: 344 arguments for the function 345 346 Returns a 2-tuple (result, exc_info) where result 347 is the return value of function, and exc_info is 348 the sys.exc_info() of the exception thrown by the 349 function (which may be None). 350 """ 351 352 result, exc_info = None, None 353 try: 354 self.record('START', None, name) 355 self._increment_group_level() 356 result = function(*args, **dargs) 357 self._decrement_group_level() 358 self.record('END GOOD', None, name) 359 except error.TestNAError, e: 360 self._decrement_group_level() 361 self.record('END TEST_NA', None, name, str(e)) 362 except Exception, e: 363 exc_info = sys.exc_info() 364 self._decrement_group_level() 365 err_msg = str(e) + '\n' + traceback.format_exc() 366 self.record('END FAIL', None, name, err_msg) 367 368 return result, exc_info 369 370 371 def run_group(self, function, *args, **dargs): 372 """\ 373 function: 374 subroutine to run 375 *args: 376 arguments for the function 377 """ 378 379 # Allow the tag for the group to be specified 380 name = function.__name__ 381 tag = dargs.pop('tag', None) 382 if tag: 383 name = tag 384 385 result, exc_info = self.__rungroup(name, function, 386 *args, **dargs) 387 388 # if there was a non-TestError exception, raise it 389 if exc_info and not isinstance(exc_info[1], error.TestError): 390 err = ''.join(traceback.format_exception(*exc_info)) 391 raise error.TestError(name + ' failed\n' + err) 392 393 # pass back the actual return value from the function 394 return result 395 396 397 def new_container(self, mbytes=None, cpus=None, root=None, name=None): 398 if not autotest_utils.grep('cpuset', '/proc/filesystems'): 399 print "Containers not enabled by latest reboot" 400 return # containers weren't enabled in this kernel boot 401 pid = os.getpid() 402 if not name: 403 name = 'test%d' % pid # make arbitrary unique name 404 self.container = cpuset.cpuset(name, job_size=mbytes, 405 job_pid=pid, cpus=cpus, root=root) 406 # This job's python shell is now running in the new container 407 # and all forked test processes will inherit that container 408 409 410 def release_container(self): 411 if self.container: 412 self.container.release() 413 self.container = None 414 415 416 def cpu_count(self): 417 if self.container: 418 return len(self.container.cpus) 419 return autotest_utils.count_cpus() # use total system count 420 421 422 # Check the passed kernel identifier against the command line 423 # and the running kernel, abort the job on missmatch. 424 def kernel_check_ident(self, expected_when, expected_id, subdir, 425 type = 'src', patches=[]): 426 print (("POST BOOT: checking booted kernel " + 427 "mark=%d identity='%s' type='%s'") % 428 (expected_when, expected_id, type)) 429 430 running_id = autotest_utils.running_os_ident() 431 432 cmdline = autotest_utils.read_one_line("/proc/cmdline") 433 434 find_sum = re.compile(r'.*IDENT=(\d+)') 435 m = find_sum.match(cmdline) 436 cmdline_when = -1 437 if m: 438 cmdline_when = int(m.groups()[0]) 439 440 # We have all the facts, see if they indicate we 441 # booted the requested kernel or not. 442 bad = False 443 if (type == 'src' and expected_id != running_id or 444 type == 'rpm' and 445 not running_id.startswith(expected_id + '::')): 446 print "check_kernel_ident: kernel identifier mismatch" 447 bad = True 448 if expected_when != cmdline_when: 449 print "check_kernel_ident: kernel command line mismatch" 450 bad = True 451 452 if bad: 453 print " Expected Ident: " + expected_id 454 print " Running Ident: " + running_id 455 print " Expected Mark: %d" % (expected_when) 456 print "Command Line Mark: %d" % (cmdline_when) 457 print " Command Line: " + cmdline 458 459 raise error.JobError("boot failure", "reboot.verify") 460 461 kernel_info = {'kernel': expected_id} 462 for i, patch in enumerate(patches): 463 kernel_info["patch%d" % i] = patch 464 self.record('GOOD', subdir, 'reboot.verify', expected_id) 465 self._decrement_group_level() 466 self.record('END GOOD', subdir, 'reboot', 467 optional_fields=kernel_info) 468 469 470 def filesystem(self, device, mountpoint = None, loop_size = 0): 471 if not mountpoint: 472 mountpoint = self.tmpdir 473 return filesystem.filesystem(self, device, mountpoint,loop_size) 474 475 476 def enable_external_logging(self): 477 pass 478 479 480 def disable_external_logging(self): 481 pass 482 483 484 def reboot_setup(self): 485 pass 486 487 488 def reboot(self, tag='autotest'): 489 self.reboot_setup() 490 self.record('START', None, 'reboot') 491 self._increment_group_level() 492 self.record('GOOD', None, 'reboot.start') 493 self.harness.run_reboot() 494 default = self.config_get('boot.set_default') 495 if default: 496 self.bootloader.set_default(tag) 497 else: 498 self.bootloader.boot_once(tag) 499 cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &" 500 autotest_utils.system(cmd) 501 self.quit() 502 503 504 def noop(self, text): 505 print "job: noop: " + text 506 507 508 def parallel(self, *tasklist): 509 """Run tasks in parallel""" 510 511 pids = [] 512 old_log_filename = self.log_filename 513 for i, task in enumerate(tasklist): 514 self.log_filename = old_log_filename + (".%d" % i) 515 task_func = lambda: task[0](*task[1:]) 516 pids.append(parallel.fork_start(self.resultdir, 517 task_func)) 518 519 old_log_path = os.path.join(self.resultdir, old_log_filename) 520 old_log = open(old_log_path, "a") 521 exceptions = [] 522 for i, pid in enumerate(pids): 523 # wait for the task to finish 524 try: 525 parallel.fork_waitfor(self.resultdir, pid) 526 except Exception, e: 527 exceptions.append(e) 528 # copy the logs from the subtask into the main log 529 new_log_path = old_log_path + (".%d" % i) 530 if os.path.exists(new_log_path): 531 new_log = open(new_log_path) 532 old_log.write(new_log.read()) 533 new_log.close() 534 old_log.flush() 535 os.remove(new_log_path) 536 old_log.close() 537 538 self.log_filename = old_log_filename 539 540 # handle any exceptions raised by the parallel tasks 541 if exceptions: 542 msg = "%d task(s) failed" % len(exceptions) 543 raise error.JobError(msg, str(exceptions), exceptions) 544 545 546 def quit(self): 547 # XXX: should have a better name. 548 self.harness.run_pause() 549 raise error.JobContinue("more to come") 550 551 552 def complete(self, status): 553 """Clean up and exit""" 554 # We are about to exit 'complete' so clean up the control file. 555 try: 556 os.unlink(self.state_file) 557 except: 558 pass 559 560 self.harness.run_complete() 561 self.disable_external_logging() 562 sys.exit(status) 563 564 565 def set_state(self, var, val): 566 # Deep copies make sure that the state can't be altered 567 # without it being re-written. Perf wise, deep copies 568 # are overshadowed by pickling/loading. 569 self.state[var] = copy.deepcopy(val) 570 pickle.dump(self.state, open(self.state_file, 'w')) 571 572 573 def __load_state(self): 574 assert not hasattr(self, "state") 575 try: 576 self.state = pickle.load(open(self.state_file, 'r')) 577 self.state_existed = True 578 except Exception: 579 print "Initializing the state engine." 580 self.state = {} 581 self.set_state('__steps', []) # writes pickle file 582 self.state_existed = False 583 584 585 def get_state(self, var, default=None): 586 if var in self.state or default == None: 587 val = self.state[var] 588 else: 589 val = default 590 return copy.deepcopy(val) 591 592 593 def __create_step_tuple(self, fn, args, dargs): 594 # Legacy code passes in an array where the first arg is 595 # the function or its name. 596 if isinstance(fn, list): 597 assert(len(args) == 0) 598 assert(len(dargs) == 0) 599 args = fn[1:] 600 fn = fn[0] 601 # Pickling actual functions is harry, thus we have to call 602 # them by name. Unfortunately, this means only functions 603 # defined globally can be used as a next step. 604 if isinstance(fn, types.FunctionType): 605 fn = fn.__name__ 606 if not isinstance(fn, types.StringTypes): 607 raise StepError("Next steps must be functions or " 608 "strings containing the function name") 609 return (fn, args, dargs) 610 611 612 def next_step(self, fn, *args, **dargs): 613 """Define the next step""" 614 steps = self.get_state('__steps') 615 steps.append(self.__create_step_tuple(fn, args, dargs)) 616 self.set_state('__steps', steps) 617 618 619 def next_step_prepend(self, fn, *args, **dargs): 620 """Insert a new step, executing first""" 621 steps = self.get_state('__steps') 622 steps.insert(0, self.__create_step_tuple(fn, args, dargs)) 623 self.set_state('__steps', steps) 624 625 626 def step_engine(self): 627 """the stepping engine -- if the control file defines 628 step_init we will be using this engine to drive multiple runs. 629 """ 630 """Do the next step""" 631 632 # Set up the environment and then interpret the control file. 633 # Some control files will have code outside of functions, 634 # which means we need to have our state engine initialized 635 # before reading in the file. 636 lcl = {'job': self} 637 exec(JOB_PREAMBLE, lcl, lcl) 638 execfile(self.control, lcl, lcl) 639 640 # If we loaded in a mid-job state file, then we presumably 641 # know what steps we have yet to run. 642 if not self.state_existed: 643 if lcl.has_key('step_init'): 644 self.next_step([lcl['step_init']]) 645 646 # Iterate through the steps. If we reboot, we'll simply 647 # continue iterating on the next step. 648 while len(self.get_state('__steps')) > 0: 649 steps = self.get_state('__steps') 650 (fn, args, dargs) = steps.pop(0) 651 self.set_state('__steps', steps) 652 653 lcl['__args'] = args 654 lcl['__dargs'] = dargs 655 exec(fn + "(*__args, **__dargs)", lcl, lcl) 656 657 658 def _init_group_level(self): 659 self.group_level = self.get_state("__group_level", default=0) 660 661 662 def _increment_group_level(self): 663 self.group_level += 1 664 self.set_state("__group_level", self.group_level) 665 666 667 def _decrement_group_level(self): 668 self.group_level -= 1 669 self.set_state("__group_level", self.group_level) 670 671 672 def record(self, status_code, subdir, operation, status = '', 673 optional_fields=None): 674 """ 675 Record job-level status 676 677 The intent is to make this file both machine parseable and 678 human readable. That involves a little more complexity, but 679 really isn't all that bad ;-) 680 681 Format is <status code>\t<subdir>\t<operation>\t<status> 682 683 status code: (GOOD|WARN|FAIL|ABORT) 684 or START 685 or END (GOOD|WARN|FAIL|ABORT) 686 687 subdir: MUST be a relevant subdirectory in the results, 688 or None, which will be represented as '----' 689 690 operation: description of what you ran (e.g. "dbench", or 691 "mkfs -t foobar /dev/sda9") 692 693 status: error message or "completed sucessfully" 694 695 ------------------------------------------------------------ 696 697 Initial tabs indicate indent levels for grouping, and is 698 governed by self.group_level 699 700 multiline messages have secondary lines prefaced by a double 701 space (' ') 702 """ 703 704 if subdir: 705 if re.match(r'[\n\t]', subdir): 706 raise ValueError("Invalid character in " 707 "subdir string") 708 substr = subdir 709 else: 710 substr = '----' 711 712 if not logging.is_valid_status(status_code): 713 raise ValueError("Invalid status code supplied: %s" % 714 status_code) 715 if not operation: 716 operation = '----' 717 718 if re.match(r'[\n\t]', operation): 719 raise ValueError("Invalid character in " 720 "operation string") 721 operation = operation.rstrip() 722 723 if not optional_fields: 724 optional_fields = {} 725 726 status = status.rstrip() 727 status = re.sub(r"\t", " ", status) 728 # Ensure any continuation lines are marked so we can 729 # detect them in the status file to ensure it is parsable. 730 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", 731 status) 732 733 # Generate timestamps for inclusion in the logs 734 epoch_time = int(time.time()) # seconds since epoch, in UTC 735 local_time = time.localtime(epoch_time) 736 optional_fields["timestamp"] = str(epoch_time) 737 optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S", 738 local_time) 739 740 fields = [status_code, substr, operation] 741 fields += ["%s=%s" % x for x in optional_fields.iteritems()] 742 fields.append(status) 743 744 msg = '\t'.join(str(x) for x in fields) 745 msg = '\t' * self.group_level + msg 746 747 msg_tag = "" 748 if "." in self.log_filename: 749 msg_tag = self.log_filename.split(".", 1)[1] 750 751 self.harness.test_status_detail(status_code, substr, 752 operation, status, msg_tag) 753 self.harness.test_status(msg, msg_tag) 754 755 # log to stdout (if enabled) 756 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 757 print msg 758 759 # log to the "root" status log 760 status_file = os.path.join(self.resultdir, self.log_filename) 761 open(status_file, "a").write(msg + "\n") 762 763 # log to the subdir status log (if subdir is set) 764 if subdir: 765 dir = os.path.join(self.resultdir, subdir) 766 if not os.path.exists(dir): 767 os.mkdir(dir) 768 769 status_file = os.path.join(dir, 770 self.DEFAULT_LOG_FILENAME) 771 open(status_file, "a").write(msg + "\n") 772 773 774class disk_usage_monitor: 775 def __init__(self, logging_func, device, max_mb_per_hour): 776 self.func = logging_func 777 self.device = device 778 self.max_mb_per_hour = max_mb_per_hour 779 780 781 def start(self): 782 self.initial_space = autotest_utils.freespace(self.device) 783 self.start_time = time.time() 784 785 786 def stop(self): 787 # if no maximum usage rate was set, we don't need to 788 # generate any warnings 789 if not self.max_mb_per_hour: 790 return 791 792 final_space = autotest_utils.freespace(self.device) 793 used_space = self.initial_space - final_space 794 stop_time = time.time() 795 total_time = stop_time - self.start_time 796 # round up the time to one minute, to keep extremely short 797 # tests from generating false positives due to short, badly 798 # timed bursts of activity 799 total_time = max(total_time, 60.0) 800 801 # determine the usage rate 802 bytes_per_sec = used_space / total_time 803 mb_per_sec = bytes_per_sec / 1024**2 804 mb_per_hour = mb_per_sec * 60 * 60 805 806 if mb_per_hour > self.max_mb_per_hour: 807 msg = ("disk space on %s was consumed at a rate of " 808 "%.2f MB/hour") 809 msg %= (self.device, mb_per_hour) 810 self.func(msg) 811 812 813 @classmethod 814 def watch(cls, *monitor_args, **monitor_dargs): 815 """ Generic decorator to wrap a function call with the 816 standard create-monitor -> start -> call -> stop idiom.""" 817 def decorator(func): 818 def watched_func(*args, **dargs): 819 monitor = cls(*monitor_args, **monitor_dargs) 820 monitor.start() 821 try: 822 func(*args, **dargs) 823 finally: 824 monitor.stop() 825 return watched_func 826 return decorator 827 828 829def runjob(control, cont = False, tag = "default", harness_type = '', 830 use_external_logging = False): 831 """The main interface to this module 832 833 control 834 The control file to use for this job. 835 cont 836 Whether this is the continuation of a previously started job 837 """ 838 control = os.path.abspath(control) 839 state = control + '.state' 840 841 # instantiate the job object ready for the control file. 842 myjob = None 843 try: 844 # Check that the control file is valid 845 if not os.path.exists(control): 846 raise error.JobError(control + 847 ": control file not found") 848 849 # When continuing, the job is complete when there is no 850 # state file, ensure we don't try and continue. 851 if cont and not os.path.exists(state): 852 raise error.JobComplete("all done") 853 if cont == False and os.path.exists(state): 854 os.unlink(state) 855 856 myjob = job(control, tag, cont, harness_type, 857 use_external_logging) 858 859 # Load in the users control file, may do any one of: 860 # 1) execute in toto 861 # 2) define steps, and select the first via next_step() 862 myjob.step_engine() 863 864 except error.JobContinue: 865 sys.exit(5) 866 867 except error.JobComplete: 868 sys.exit(1) 869 870 except error.JobError, instance: 871 print "JOB ERROR: " + instance.args[0] 872 if myjob: 873 command = None 874 if len(instance.args) > 1: 875 command = instance.args[1] 876 myjob.record('ABORT', None, command, instance.args[0]) 877 myjob._decrement_group_level() 878 myjob.record('END ABORT', None, None) 879 assert(myjob.group_level == 0) 880 myjob.complete(1) 881 else: 882 sys.exit(1) 883 884 except Exception, e: 885 msg = str(e) + '\n' + traceback.format_exc() 886 print "JOB ERROR: " + msg 887 if myjob: 888 myjob.record('ABORT', None, None, msg) 889 myjob._decrement_group_level() 890 myjob.record('END ABORT', None, None) 891 assert(myjob.group_level == 0) 892 myjob.complete(1) 893 else: 894 sys.exit(1) 895 896 # If we get here, then we assume the job is complete and good. 897 myjob._decrement_group_level() 898 myjob.record('END GOOD', None, None) 899 assert(myjob.group_level == 0) 900 901 myjob.complete(0) 902 903 904# site_job.py may be non-existant or empty, make sure that an appropriate 905# site_job class is created nevertheless 906try: 907 from site_job import site_job 908except ImportError: 909 class site_job(base_job): 910 pass 911 912class job(site_job): 913 pass 914 915