job.py revision 6b504ff04e21af766da0cfbcde8e1e352a474dfa
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from common.error import * 14from common import barrier 15import kernel, xen, test, profilers, filesystem, fd_stack, boottool 16import harness, config 17import sysinfo 18import cpuset 19 20class job: 21 """The actual job against which we do everything. 22 23 Properties: 24 autodir 25 The top level autotest directory (/usr/local/autotest). 26 Comes from os.environ['AUTODIR']. 27 bindir 28 <autodir>/bin/ 29 testdir 30 <autodir>/tests/ 31 profdir 32 <autodir>/profilers/ 33 tmpdir 34 <autodir>/tmp/ 35 resultdir 36 <autodir>/results/<jobtag> 37 stdout 38 fd_stack object for stdout 39 stderr 40 fd_stack object for stderr 41 profilers 42 the profilers object for this job 43 harness 44 the server harness object for this job 45 config 46 the job configuration for this job 47 """ 48 49 def __init__(self, control, jobtag, cont, harness_type=None): 50 """ 51 control 52 The control file (pathname of) 53 jobtag 54 The job tag string (eg "default") 55 cont 56 If this is the continuation of this job 57 harness_type 58 An alternative server harness 59 """ 60 self.autodir = os.environ['AUTODIR'] 61 self.bindir = os.path.join(self.autodir, 'bin') 62 self.testdir = os.path.join(self.autodir, 'tests') 63 self.profdir = os.path.join(self.autodir, 'profilers') 64 self.tmpdir = os.path.join(self.autodir, 'tmp') 65 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 66 self.control = os.path.abspath(control) 67 68 if not cont: 69 if os.path.exists(self.tmpdir): 70 system('umount -f %s > /dev/null 2> /dev/null'%\ 71 self.tmpdir, ignorestatus=True) 72 system('rm -rf ' + self.tmpdir) 73 os.mkdir(self.tmpdir) 74 75 results = os.path.join(self.autodir, 'results') 76 if not os.path.exists(results): 77 os.mkdir(results) 78 79 download = os.path.join(self.testdir, 'download') 80 if os.path.exists(download): 81 system('rm -rf ' + download) 82 os.mkdir(download) 83 84 if os.path.exists(self.resultdir): 85 system('rm -rf ' + self.resultdir) 86 os.mkdir(self.resultdir) 87 88 os.mkdir(os.path.join(self.resultdir, 'debug')) 89 os.mkdir(os.path.join(self.resultdir, 'analysis')) 90 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 91 92 shutil.copyfile(self.control, 93 os.path.join(self.resultdir, 'control')) 94 95 self.control = control 96 self.jobtag = jobtag 97 98 self.stdout = fd_stack.fd_stack(1, sys.stdout) 99 self.stderr = fd_stack.fd_stack(2, sys.stderr) 100 self.group_level = 0 101 102 self.config = config.config(self) 103 104 self.harness = harness.select(harness_type, self) 105 106 self.profilers = profilers.profilers(self) 107 108 try: 109 tool = self.config_get('boottool.executable') 110 self.bootloader = boottool.boottool(tool) 111 except: 112 pass 113 114 # log "before each step" sysinfo 115 pwd = os.getcwd() 116 try: 117 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 118 sysinfo.before_each_step() 119 finally: 120 os.chdir(pwd) 121 122 if not cont: 123 self.record('START', None, None) 124 self.group_level = 1 125 126 self.harness.run_start() 127 128 129 def relative_path(self, path): 130 """\ 131 Return a patch relative to the job results directory 132 """ 133 head = len(self.resultdir) + 1 # remove the / inbetween 134 return path[head:] 135 136 137 def control_get(self): 138 return self.control 139 140 141 def control_set(self, control): 142 self.control = os.path.abspath(control) 143 144 145 def harness_select(self, which): 146 self.harness = harness.select(which, self) 147 148 149 def config_set(self, name, value): 150 self.config.set(name, value) 151 152 153 def config_get(self, name): 154 return self.config.get(name) 155 156 def setup_dirs(self, results_dir, tmp_dir): 157 if not tmp_dir: 158 tmp_dir = os.path.join(self.tmpdir, 'build') 159 if not os.path.exists(tmp_dir): 160 os.mkdir(tmp_dir) 161 if not os.path.isdir(tmp_dir): 162 raise "Temp dir (%s) is not a dir - args backwards?" \ 163 % self.tmpdir 164 165 # We label the first build "build" and then subsequent ones 166 # as "build.2", "build.3", etc. Whilst this is a little bit 167 # inconsistent, 99.9% of jobs will only have one build 168 # (that's not done as kernbench, sparse, or buildtest), 169 # so it works out much cleaner. One of life's comprimises. 170 if not results_dir: 171 results_dir = os.path.join(self.resultdir, 'build') 172 i = 2 173 while os.path.exists(results_dir): 174 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 175 i += 1 176 if not os.path.exists(results_dir): 177 os.mkdir(results_dir) 178 179 return (results_dir, tmp_dir) 180 181 182 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 183 kjob = None ): 184 """Summon a xen object""" 185 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 186 build_dir = 'xen' 187 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 188 189 190 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 191 """Summon a kernel object""" 192 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 193 build_dir = 'linux' 194 return kernel.auto_kernel(self, base_tree, results_dir, 195 tmp_dir, build_dir, leave) 196 197 198 def barrier(self, *args, **kwds): 199 """Create a barrier object""" 200 return barrier.barrier(*args, **kwds) 201 202 203 def setup_dep(self, deps): 204 """Set up the dependencies for this test. 205 206 deps is a list of libraries required for this test. 207 """ 208 for dep in deps: 209 try: 210 os.chdir(os.path.join(self.autodir, 'deps', dep)) 211 system('./' + dep + '.py') 212 except: 213 error = "setting up dependency " + dep + "\n" 214 raise UnhandledError(error) 215 216 217 def __runtest(self, url, tag, args, dargs): 218 try: 219 l = lambda : test.runtest(self, url, tag, args, dargs) 220 pid = fork_start(self.resultdir, l) 221 fork_waitfor(self.resultdir, pid) 222 except AutotestError: 223 raise 224 except: 225 raise UnhandledError('running test ' + \ 226 self.__class__.__name__ + "\n") 227 228 229 def run_test(self, url, *args, **dargs): 230 """Summon a test object and run it. 231 232 tag 233 tag to add to testname 234 url 235 url of the test to run 236 """ 237 238 if not url: 239 raise "Test name is invalid. Switched arguments?" 240 (group, testname) = test.testname(url) 241 tag = dargs.pop('tag', None) 242 self.container = None 243 container = dargs.pop('container', None) 244 subdir = testname 245 if tag: 246 subdir += '.' + tag 247 248 if container: 249 container_name = container.pop('container_name', None) 250 cpu = container.get('cpu', None) 251 root_container = container.get('root', 'sys') 252 if not container_name: 253 container_name = testname 254 if not grep('cpusets', '/proc/filesystems'): 255 256 self.container = cpuset.cpuset(container_name, 257 container['mem'], 258 os.getpid(), 259 root = root_container, 260 cpus = cpu) 261 # We are running in a container now... 262 263 def group_func(): 264 try: 265 self.__runtest(url, tag, args, dargs) 266 except Exception, detail: 267 self.record('FAIL', subdir, testname, 268 str(detail)) 269 raise 270 else: 271 self.record('GOOD', subdir, testname, 272 'completed successfully') 273 result, exc_info = self.__rungroup(subdir, group_func) 274 if self.container: 275 self.container.release() 276 self.container = None 277 278 if exc_info and isinstance(exc_info[1], TestError): 279 return False 280 elif exc_info: 281 raise exc_info[0], exc_info[1], exc_info[2] 282 else: 283 return True 284 285 286 def __rungroup(self, name, function, *args, **dargs): 287 """\ 288 name: 289 name of the group 290 function: 291 subroutine to run 292 *args: 293 arguments for the function 294 295 Returns a 2-tuple (result, exc_info) where result 296 is the return value of function, and exc_info is 297 the sys.exc_info() of the exception thrown by the 298 function (which may be None). 299 """ 300 301 result, exc_info = None, None 302 try: 303 self.record('START', None, name) 304 self.group_level += 1 305 result = function(*args, **dargs) 306 self.group_level -= 1 307 self.record('END GOOD', None, name) 308 except Exception, e: 309 exc_info = sys.exc_info() 310 self.group_level -= 1 311 err_msg = str(e) + '\n' + format_error() 312 self.record('END FAIL', None, name, err_msg) 313 314 return result, exc_info 315 316 317 def run_group(self, function, *args, **dargs): 318 """\ 319 function: 320 subroutine to run 321 *args: 322 arguments for the function 323 """ 324 325 # Allow the tag for the group to be specified 326 name = function.__name__ 327 tag = dargs.pop('tag', None) 328 if tag: 329 name = tag 330 331 result, exc_info = self.__rungroup(name, function, 332 *args, **dargs) 333 334 # if there was a non-TestError exception, raise it 335 if exc_info and isinstance(exc_info[1], TestError): 336 err = ''.join(traceback.format_exception(*exc_info)) 337 raise TestError(name + ' failed\n' + err) 338 339 # pass back the actual return value from the function 340 return result 341 342 343 # Check the passed kernel identifier against the command line 344 # and the running kernel, abort the job on missmatch. 345 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 346 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 347 % (expected_when, expected_id, expected_cl, type) 348 349 running_id = running_os_ident() 350 351 cmdline = read_one_line("/proc/cmdline") 352 353 find_sum = re.compile(r'.*IDENT=(\d+)') 354 m = find_sum.match(cmdline) 355 cmdline_when = -1 356 if m: 357 cmdline_when = int(m.groups()[0]) 358 359 cl_re = re.compile(r'\d{7,}') 360 cl_match = cl_re.search(system_output('uname -v').split()[1]) 361 if cl_match: 362 current_cl = cl_match.group() 363 else: 364 current_cl = None 365 366 # We have all the facts, see if they indicate we 367 # booted the requested kernel or not. 368 bad = False 369 if (type == 'src' and expected_id != running_id or 370 type == 'rpm' and not running_id.startswith(expected_id + '::')): 371 print "check_kernel_ident: kernel identifier mismatch" 372 bad = True 373 if expected_when != cmdline_when: 374 print "check_kernel_ident: kernel command line mismatch" 375 bad = True 376 if expected_cl and current_cl and str(expected_cl) != current_cl: 377 print 'check_kernel_ident: kernel changelist mismatch' 378 bad = True 379 380 if bad: 381 print " Expected Ident: " + expected_id 382 print " Running Ident: " + running_id 383 print " Expected Mark: %d" % (expected_when) 384 print "Command Line Mark: %d" % (cmdline_when) 385 print " Expected P4 CL: %s" % expected_cl 386 print " P4 CL: %s" % current_cl 387 print " Command Line: " + cmdline 388 389 raise JobError("boot failure", "reboot.verify") 390 391 self.record('GOOD', subdir, 'reboot.verify') 392 393 394 def filesystem(self, device, mountpoint = None, loop_size = 0): 395 if not mountpoint: 396 mountpoint = self.tmpdir 397 return filesystem.filesystem(self, device, mountpoint,loop_size) 398 399 400 def reboot(self, tag='autotest'): 401 self.record('GOOD', None, 'reboot.start') 402 self.harness.run_reboot() 403 default = self.config_get('boot.set_default') 404 if default: 405 self.bootloader.set_default(tag) 406 else: 407 self.bootloader.boot_once(tag) 408 system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 409 self.quit() 410 411 412 def noop(self, text): 413 print "job: noop: " + text 414 415 416 # Job control primatives. 417 418 def __parallel_execute(self, func, *args): 419 func(*args) 420 421 422 def parallel(self, *tasklist): 423 """Run tasks in parallel""" 424 425 pids = [] 426 for task in tasklist: 427 pids.append(fork_start(self.resultdir, 428 lambda: self.__parallel_execute(*task))) 429 for pid in pids: 430 fork_waitfor(self.resultdir, pid) 431 432 433 def quit(self): 434 # XXX: should have a better name. 435 self.harness.run_pause() 436 raise JobContinue("more to come") 437 438 439 def complete(self, status): 440 """Clean up and exit""" 441 # We are about to exit 'complete' so clean up the control file. 442 try: 443 os.unlink(self.control + '.state') 444 except: 445 pass 446 self.harness.run_complete() 447 sys.exit(status) 448 449 450 steps = [] 451 def next_step(self, step): 452 """Define the next step""" 453 if not isinstance(step[0], basestring): 454 step[0] = step[0].__name__ 455 self.steps.append(step) 456 pickle.dump(self.steps, open(self.control + '.state', 'w')) 457 458 459 def next_step_prepend(self, step): 460 """Insert a new step, executing first""" 461 if not isinstance(step[0], basestring): 462 step[0] = step[0].__name__ 463 self.steps.insert(0, step) 464 pickle.dump(self.steps, open(self.control + '.state', 'w')) 465 466 467 def step_engine(self): 468 """the stepping engine -- if the control file defines 469 step_init we will be using this engine to drive multiple runs. 470 """ 471 """Do the next step""" 472 lcl = dict({'job': self}) 473 474 str = """ 475from common.error import * 476from autotest_utils import * 477""" 478 exec(str, lcl, lcl) 479 execfile(self.control, lcl, lcl) 480 481 state = self.control + '.state' 482 # If there is a mid-job state file load that in and continue 483 # where it indicates. Otherwise start stepping at the passed 484 # entry. 485 try: 486 self.steps = pickle.load(open(state, 'r')) 487 except: 488 if lcl.has_key('step_init'): 489 self.next_step([lcl['step_init']]) 490 491 # Run the step list. 492 while len(self.steps) > 0: 493 step = self.steps.pop(0) 494 pickle.dump(self.steps, open(state, 'w')) 495 496 cmd = step.pop(0) 497 lcl['__args'] = step 498 exec(cmd + "(*__args)", lcl, lcl) 499 500 501 def record(self, status_code, subdir, operation, status = ''): 502 """ 503 Record job-level status 504 505 The intent is to make this file both machine parseable and 506 human readable. That involves a little more complexity, but 507 really isn't all that bad ;-) 508 509 Format is <status code>\t<subdir>\t<operation>\t<status> 510 511 status code: (GOOD|WARN|FAIL|ABORT) 512 or START 513 or END (GOOD|WARN|FAIL|ABORT) 514 515 subdir: MUST be a relevant subdirectory in the results, 516 or None, which will be represented as '----' 517 518 operation: description of what you ran (e.g. "dbench", or 519 "mkfs -t foobar /dev/sda9") 520 521 status: error message or "completed sucessfully" 522 523 ------------------------------------------------------------ 524 525 Initial tabs indicate indent levels for grouping, and is 526 governed by self.group_level 527 528 multiline messages have secondary lines prefaced by a double 529 space (' ') 530 """ 531 532 if subdir: 533 if re.match(r'[\n\t]', subdir): 534 raise "Invalid character in subdir string" 535 substr = subdir 536 else: 537 substr = '----' 538 539 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 540 status_code): 541 raise "Invalid status code supplied: %s" % status_code 542 if not operation: 543 operation = '----' 544 if re.match(r'[\n\t]', operation): 545 raise "Invalid character in operation string" 546 operation = operation.rstrip() 547 status = status.rstrip() 548 status = re.sub(r"\t", " ", status) 549 # Ensure any continuation lines are marked so we can 550 # detect them in the status file to ensure it is parsable. 551 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 552 553 # Generate timestamps for inclusion in the logs 554 epoch_time = int(time.time()) # seconds since epoch, in UTC 555 local_time = time.localtime(epoch_time) 556 epoch_time_str = "timestamp=%d" % (epoch_time,) 557 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 558 local_time) 559 560 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 561 epoch_time_str, local_time_str, 562 status)) 563 msg = '\t' * self.group_level + msg 564 565 self.harness.test_status_detail(status_code, substr, 566 operation, status) 567 self.harness.test_status(msg) 568 print msg 569 status_file = os.path.join(self.resultdir, 'status') 570 open(status_file, "a").write(msg + "\n") 571 if subdir: 572 status_file = os.path.join(self.resultdir, subdir, 'status') 573 open(status_file, "a").write(msg + "\n") 574 575 576def runjob(control, cont = False, tag = "default", harness_type = ''): 577 """The main interface to this module 578 579 control 580 The control file to use for this job. 581 cont 582 Whether this is the continuation of a previously started job 583 """ 584 control = os.path.abspath(control) 585 state = control + '.state' 586 587 # instantiate the job object ready for the control file. 588 myjob = None 589 try: 590 # Check that the control file is valid 591 if not os.path.exists(control): 592 raise JobError(control + ": control file not found") 593 594 # When continuing, the job is complete when there is no 595 # state file, ensure we don't try and continue. 596 if cont and not os.path.exists(state): 597 raise JobComplete("all done") 598 if cont == False and os.path.exists(state): 599 os.unlink(state) 600 601 myjob = job(control, tag, cont, harness_type) 602 603 # Load in the users control file, may do any one of: 604 # 1) execute in toto 605 # 2) define steps, and select the first via next_step() 606 myjob.step_engine() 607 608 except JobContinue: 609 sys.exit(5) 610 611 except JobComplete: 612 sys.exit(1) 613 614 except JobError, instance: 615 print "JOB ERROR: " + instance.args[0] 616 if myjob: 617 command = None 618 if len(instance.args) > 1: 619 command = instance.args[1] 620 myjob.group_level = 0 621 myjob.record('ABORT', None, command, instance.args[0]) 622 myjob.record('END ABORT', None, None) 623 myjob.complete(1) 624 else: 625 sys.exit(1) 626 627 except Exception, e: 628 msg = str(e) + '\n' + format_error() 629 print "JOB ERROR: " + msg 630 if myjob: 631 myjob.group_level = 0 632 myjob.record('ABORT', None, None, msg) 633 myjob.record('END ABORT', None, None) 634 myjob.complete(1) 635 else: 636 sys.exit(1) 637 638 # If we get here, then we assume the job is complete and good. 639 myjob.group_level = 0 640 myjob.record('END GOOD', None, None) 641 myjob.complete(0) 642