job.py revision f91efaf53bf840d9545c3c0ceaa928d8fbb6df3b
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from error import * 14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 15import harness, config 16import sysinfo 17 18class job: 19 """The actual job against which we do everything. 20 21 Properties: 22 autodir 23 The top level autotest directory (/usr/local/autotest). 24 Comes from os.environ['AUTODIR']. 25 bindir 26 <autodir>/bin/ 27 testdir 28 <autodir>/tests/ 29 profdir 30 <autodir>/profilers/ 31 tmpdir 32 <autodir>/tmp/ 33 resultdir 34 <autodir>/results/<jobtag> 35 stdout 36 fd_stack object for stdout 37 stderr 38 fd_stack object for stderr 39 profilers 40 the profilers object for this job 41 harness 42 the server harness object for this job 43 config 44 the job configuration for this job 45 """ 46 47 def __init__(self, control, jobtag, cont, harness_type=None): 48 """ 49 control 50 The control file (pathname of) 51 jobtag 52 The job tag string (eg "default") 53 cont 54 If this is the continuation of this job 55 harness_type 56 An alternative server harness 57 """ 58 self.autodir = os.environ['AUTODIR'] 59 self.bindir = os.path.join(self.autodir, 'bin') 60 self.testdir = os.path.join(self.autodir, 'tests') 61 self.profdir = os.path.join(self.autodir, 'profilers') 62 self.tmpdir = os.path.join(self.autodir, 'tmp') 63 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 64 65 if not cont: 66 if os.path.exists(self.tmpdir): 67 system('umount -f %s > /dev/null 2> /dev/null'%\ 68 self.tmpdir, ignorestatus=True) 69 system('rm -rf ' + self.tmpdir) 70 os.mkdir(self.tmpdir) 71 72 results = os.path.join(self.autodir, 'results') 73 if not os.path.exists(results): 74 os.mkdir(results) 75 76 download = os.path.join(self.testdir, 'download') 77 if os.path.exists(download): 78 system('rm -rf ' + download) 79 os.mkdir(download) 80 81 if os.path.exists(self.resultdir): 82 system('rm -rf ' + self.resultdir) 83 os.mkdir(self.resultdir) 84 85 os.mkdir(os.path.join(self.resultdir, 'debug')) 86 os.mkdir(os.path.join(self.resultdir, 'analysis')) 87 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 88 89 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 90 91 self.control = control 92 self.jobtag = jobtag 93 94 self.stdout = fd_stack.fd_stack(1, sys.stdout) 95 self.stderr = fd_stack.fd_stack(2, sys.stderr) 96 self.group_level = 0 97 98 self.config = config.config(self) 99 100 self.harness = harness.select(harness_type, self) 101 102 self.profilers = profilers.profilers(self) 103 104 try: 105 tool = self.config_get('boottool.executable') 106 self.bootloader = boottool.boottool(tool) 107 except: 108 pass 109 110 # log "before each step" sysinfo 111 pwd = os.getcwd() 112 try: 113 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 114 sysinfo.before_each_step() 115 finally: 116 os.chdir(pwd) 117 118 if not cont: 119 self.record('START', None, None) 120 self.group_level = 1 121 122 self.harness.run_start() 123 124 125 def relative_path(self, path): 126 """\ 127 Return a patch relative to the job results directory 128 """ 129 head = len(self.resultdir) + 1 # remove the / inbetween 130 return path[head:] 131 132 133 def control_get(self): 134 return self.control 135 136 137 def harness_select(self, which): 138 self.harness = harness.select(which, self) 139 140 141 def config_set(self, name, value): 142 self.config.set(name, value) 143 144 145 def config_get(self, name): 146 return self.config.get(name) 147 148 def setup_dirs(self, results_dir, tmp_dir): 149 if not tmp_dir: 150 tmp_dir = os.path.join(self.tmpdir, 'build') 151 if not os.path.exists(tmp_dir): 152 os.mkdir(tmp_dir) 153 if not os.path.isdir(tmp_dir): 154 raise "Temp dir (%s) is not a dir - args backwards?" \ 155 % self.tmpdir 156 157 # We label the first build "build" and then subsequent ones 158 # as "build.2", "build.3", etc. Whilst this is a little bit 159 # inconsistent, 99.9% of jobs will only have one build 160 # (that's not done as kernbench, sparse, or buildtest), 161 # so it works out much cleaner. One of life's comprimises. 162 if not results_dir: 163 results_dir = os.path.join(self.resultdir, 'build') 164 i = 2 165 while os.path.exists(results_dir): 166 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 167 i += 1 168 if not os.path.exists(results_dir): 169 os.mkdir(results_dir) 170 171 return (results_dir, tmp_dir) 172 173 174 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 175 kjob = None ): 176 """Summon a xen object""" 177 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 178 build_dir = 'xen' 179 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 180 181 182 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 183 """Summon a kernel object""" 184 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 185 build_dir = 'linux' 186 return kernel.auto_kernel(self, base_tree, results_dir, 187 tmp_dir, build_dir, leave) 188 189 190 def barrier(self, *args): 191 """Create a barrier object""" 192 return barrier.barrier(*args) 193 194 195 def setup_dep(self, deps): 196 """Set up the dependencies for this test. 197 198 deps is a list of libraries required for this test. 199 """ 200 for dep in deps: 201 try: 202 os.chdir(os.path.join(self.autodir, 'deps', dep)) 203 system('./' + dep + '.py') 204 except: 205 error = "setting up dependency " + dep + "\n" 206 raise UnhandledError(error) 207 208 209 def __runtest(self, url, tag, args, dargs): 210 try: 211 l = lambda : test.runtest(self, url, tag, args, dargs) 212 pid = fork_start(self.resultdir, l) 213 fork_waitfor(self.resultdir, pid) 214 except AutotestError: 215 raise 216 except: 217 raise UnhandledError('running test ' + \ 218 self.__class__.__name__ + "\n") 219 220 221 def run_test(self, url, *args, **dargs): 222 """Summon a test object and run it. 223 224 tag 225 tag to add to testname 226 url 227 url of the test to run 228 """ 229 230 if not url: 231 raise "Test name is invalid. Switched arguments?" 232 (group, testname) = test.testname(url) 233 tag = dargs.pop('tag', None) 234 subdir = testname 235 if tag: 236 subdir += '.' + tag 237 238 def group_func(): 239 try: 240 self.__runtest(url, tag, args, dargs) 241 except Exception, detail: 242 self.record('FAIL', subdir, testname, 243 str(detail)) 244 raise 245 else: 246 self.record('GOOD', subdir, testname, 247 'completed successfully') 248 result, exc_info = self.__rungroup(subdir, group_func) 249 250 if exc_info and isinstance(exc_info[1], TestError): 251 return False 252 elif exc_info: 253 raise exc_info[0], exc_info[1], exc_info[2] 254 else: 255 return True 256 257 258 def __rungroup(self, name, function, *args, **dargs): 259 """\ 260 name: 261 name of the group 262 function: 263 subroutine to run 264 *args: 265 arguments for the function 266 267 Returns a 2-tuple (result, exc_info) where result 268 is the return value of function, and exc_info is 269 the sys.exc_info() of the exception thrown by the 270 function (which may be None). 271 """ 272 273 result, exc_info = None, None 274 try: 275 self.record('START', None, name) 276 self.group_level += 1 277 result = function(*args, **dargs) 278 self.group_level -= 1 279 self.record('END GOOD', None, name) 280 except Exception, e: 281 exc_info = sys.exc_info() 282 self.group_level -= 1 283 err_msg = str(e) + '\n' + format_error() 284 self.record('END FAIL', None, name, err_msg) 285 286 return result, exc_info 287 288 289 def run_group(self, function, *args, **dargs): 290 """\ 291 function: 292 subroutine to run 293 *args: 294 arguments for the function 295 """ 296 297 # Allow the tag for the group to be specified 298 name = function.__name__ 299 tag = dargs.pop('tag', None) 300 if tag: 301 name = tag 302 303 result, exc_info = self.__rungroup(name, function, 304 *args, **dargs) 305 306 # if there was a non-TestError exception, raise it 307 if exc_info and isinstance(exc_info[1], TestError): 308 err = ''.join(traceback.format_exception(*exc_info)) 309 raise TestError(name + ' failed\n' + err) 310 311 # pass back the actual return value from the function 312 return result 313 314 315 # Check the passed kernel identifier against the command line 316 # and the running kernel, abort the job on missmatch. 317 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 318 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 319 % (expected_when, expected_id, expected_cl, type) 320 321 running_id = running_os_ident() 322 323 cmdline = read_one_line("/proc/cmdline") 324 325 find_sum = re.compile(r'.*IDENT=(\d+)') 326 m = find_sum.match(cmdline) 327 cmdline_when = -1 328 if m: 329 cmdline_when = int(m.groups()[0]) 330 331 cl_re = re.compile(r'\d{7,}') 332 cl_match = cl_re.search(system_output('uname -v').split()[1]) 333 if cl_match: 334 current_cl = cl_match.group() 335 else: 336 current_cl = None 337 338 # We have all the facts, see if they indicate we 339 # booted the requested kernel or not. 340 bad = False 341 if (type == 'src' and expected_id != running_id or 342 type == 'rpm' and not running_id.startswith(expected_id + '::')): 343 print "check_kernel_ident: kernel identifier mismatch" 344 bad = True 345 if expected_when != cmdline_when: 346 print "check_kernel_ident: kernel command line mismatch" 347 bad = True 348 if expected_cl and current_cl and str(expected_cl) != current_cl: 349 print 'check_kernel_ident: kernel changelist mismatch' 350 bad = True 351 352 if bad: 353 print " Expected Ident: " + expected_id 354 print " Running Ident: " + running_id 355 print " Expected Mark: %d" % (expected_when) 356 print "Command Line Mark: %d" % (cmdline_when) 357 print " Expected P4 CL: %s" % expected_cl 358 print " P4 CL: %s" % current_cl 359 print " Command Line: " + cmdline 360 361 raise JobError("boot failure", "reboot.verify") 362 363 self.record('GOOD', subdir, 'reboot.verify') 364 365 366 def filesystem(self, device, mountpoint = None, loop_size = 0): 367 if not mountpoint: 368 mountpoint = self.tmpdir 369 return filesystem.filesystem(self, device, mountpoint,loop_size) 370 371 372 def reboot(self, tag='autotest'): 373 self.record('GOOD', None, 'reboot.start') 374 self.harness.run_reboot() 375 default = self.config_get('boot.set_default') 376 if default: 377 self.bootloader.set_default(tag) 378 else: 379 self.bootloader.boot_once(tag) 380 system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 381 self.quit() 382 383 384 def noop(self, text): 385 print "job: noop: " + text 386 387 388 # Job control primatives. 389 390 def __parallel_execute(self, func, *args): 391 func(*args) 392 393 394 def parallel(self, *tasklist): 395 """Run tasks in parallel""" 396 397 pids = [] 398 for task in tasklist: 399 pids.append(fork_start(self.resultdir, 400 lambda: self.__parallel_execute(*task))) 401 for pid in pids: 402 fork_waitfor(self.resultdir, pid) 403 404 405 def quit(self): 406 # XXX: should have a better name. 407 self.harness.run_pause() 408 raise JobContinue("more to come") 409 410 411 def complete(self, status): 412 """Clean up and exit""" 413 # We are about to exit 'complete' so clean up the control file. 414 try: 415 os.unlink(self.control + '.state') 416 except: 417 pass 418 self.harness.run_complete() 419 sys.exit(status) 420 421 422 steps = [] 423 def next_step(self, step): 424 """Define the next step""" 425 if not isinstance(step[0], basestring): 426 step[0] = step[0].__name__ 427 self.steps.append(step) 428 pickle.dump(self.steps, open(self.control + '.state', 'w')) 429 430 431 def next_step_prepend(self, step): 432 """Insert a new step, executing first""" 433 if not isinstance(step[0], basestring): 434 step[0] = step[0].__name__ 435 self.steps.insert(0, step) 436 pickle.dump(self.steps, open(self.control + '.state', 'w')) 437 438 439 def step_engine(self): 440 """the stepping engine -- if the control file defines 441 step_init we will be using this engine to drive multiple runs. 442 """ 443 """Do the next step""" 444 lcl = dict({'job': self}) 445 446 str = """ 447from error import * 448from autotest_utils import * 449""" 450 exec(str, lcl, lcl) 451 execfile(self.control, lcl, lcl) 452 453 state = self.control + '.state' 454 # If there is a mid-job state file load that in and continue 455 # where it indicates. Otherwise start stepping at the passed 456 # entry. 457 try: 458 self.steps = pickle.load(open(state, 'r')) 459 except: 460 if lcl.has_key('step_init'): 461 self.next_step([lcl['step_init']]) 462 463 # Run the step list. 464 while len(self.steps) > 0: 465 step = self.steps.pop(0) 466 pickle.dump(self.steps, open(state, 'w')) 467 468 cmd = step.pop(0) 469 lcl['__args'] = step 470 exec(cmd + "(*__args)", lcl, lcl) 471 472 473 def record(self, status_code, subdir, operation, status = ''): 474 """ 475 Record job-level status 476 477 The intent is to make this file both machine parseable and 478 human readable. That involves a little more complexity, but 479 really isn't all that bad ;-) 480 481 Format is <status code>\t<subdir>\t<operation>\t<status> 482 483 status code: (GOOD|WARN|FAIL|ABORT) 484 or START 485 or END (GOOD|WARN|FAIL|ABORT) 486 487 subdir: MUST be a relevant subdirectory in the results, 488 or None, which will be represented as '----' 489 490 operation: description of what you ran (e.g. "dbench", or 491 "mkfs -t foobar /dev/sda9") 492 493 status: error message or "completed sucessfully" 494 495 ------------------------------------------------------------ 496 497 Initial tabs indicate indent levels for grouping, and is 498 governed by self.group_level 499 500 multiline messages have secondary lines prefaced by a double 501 space (' ') 502 """ 503 504 if subdir: 505 if re.match(r'[\n\t]', subdir): 506 raise "Invalid character in subdir string" 507 substr = subdir 508 else: 509 substr = '----' 510 511 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 512 status_code): 513 raise "Invalid status code supplied: %s" % status_code 514 if not operation: 515 operation = '----' 516 if re.match(r'[\n\t]', operation): 517 raise "Invalid character in operation string" 518 operation = operation.rstrip() 519 status = status.rstrip() 520 status = re.sub(r"\t", " ", status) 521 # Ensure any continuation lines are marked so we can 522 # detect them in the status file to ensure it is parsable. 523 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 524 525 # Generate timestamps for inclusion in the logs 526 epoch_time = int(time.time()) # seconds since epoch, in UTC 527 local_time = time.localtime(epoch_time) 528 epoch_time_str = "timestamp=%d" % (epoch_time,) 529 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 530 local_time) 531 532 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 533 epoch_time_str, local_time_str, 534 status)) 535 msg = '\t' * self.group_level + msg 536 537 self.harness.test_status_detail(status_code, substr, 538 operation, status) 539 self.harness.test_status(msg) 540 print msg 541 status_file = os.path.join(self.resultdir, 'status') 542 open(status_file, "a").write(msg + "\n") 543 if subdir: 544 status_file = os.path.join(self.resultdir, subdir, 'status') 545 open(status_file, "a").write(msg + "\n") 546 547 548def runjob(control, cont = False, tag = "default", harness_type = ''): 549 """The main interface to this module 550 551 control 552 The control file to use for this job. 553 cont 554 Whether this is the continuation of a previously started job 555 """ 556 control = os.path.abspath(control) 557 state = control + '.state' 558 559 # instantiate the job object ready for the control file. 560 myjob = None 561 try: 562 # Check that the control file is valid 563 if not os.path.exists(control): 564 raise JobError(control + ": control file not found") 565 566 # When continuing, the job is complete when there is no 567 # state file, ensure we don't try and continue. 568 if cont and not os.path.exists(state): 569 sys.exit(1) 570 if cont == False and os.path.exists(state): 571 os.unlink(state) 572 573 myjob = job(control, tag, cont, harness_type) 574 575 # Load in the users control file, may do any one of: 576 # 1) execute in toto 577 # 2) define steps, and select the first via next_step() 578 myjob.step_engine() 579 580 except JobContinue: 581 sys.exit(5) 582 583 except JobError, instance: 584 print "JOB ERROR: " + instance.args[0] 585 if myjob: 586 command = None 587 if len(instance.args) > 1: 588 command = instance.args[1] 589 myjob.group_level = 0 590 myjob.record('ABORT', None, command, instance.args[0]) 591 myjob.record('END ABORT', None, None) 592 myjob.complete(1) 593 594 except Exception, e: 595 msg = str(e) + '\n' + format_error() 596 print "JOB ERROR: " + msg 597 if myjob: 598 myjob.group_level = 0 599 myjob.record('ABORT', None, None, msg) 600 myjob.record('END ABORT', None, None) 601 myjob.complete(1) 602 603 # If we get here, then we assume the job is complete and good. 604 myjob.group_level = 0 605 myjob.record('END GOOD', None, None) 606 myjob.complete(0) 607