job.py revision 8d83cdc678ea7dcfc6780a029d39f841f548d859
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from common.error import * 14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 15import harness, config 16import sysinfo 17 18class job: 19 """The actual job against which we do everything. 20 21 Properties: 22 autodir 23 The top level autotest directory (/usr/local/autotest). 24 Comes from os.environ['AUTODIR']. 25 bindir 26 <autodir>/bin/ 27 testdir 28 <autodir>/tests/ 29 profdir 30 <autodir>/profilers/ 31 tmpdir 32 <autodir>/tmp/ 33 resultdir 34 <autodir>/results/<jobtag> 35 stdout 36 fd_stack object for stdout 37 stderr 38 fd_stack object for stderr 39 profilers 40 the profilers object for this job 41 harness 42 the server harness object for this job 43 config 44 the job configuration for this job 45 """ 46 47 def __init__(self, control, jobtag, cont, harness_type=None): 48 """ 49 control 50 The control file (pathname of) 51 jobtag 52 The job tag string (eg "default") 53 cont 54 If this is the continuation of this job 55 harness_type 56 An alternative server harness 57 """ 58 self.autodir = os.environ['AUTODIR'] 59 self.bindir = os.path.join(self.autodir, 'bin') 60 self.testdir = os.path.join(self.autodir, 'tests') 61 self.profdir = os.path.join(self.autodir, 'profilers') 62 self.tmpdir = os.path.join(self.autodir, 'tmp') 63 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 64 self.control = os.path.abspath(control) 65 66 if not cont: 67 if os.path.exists(self.tmpdir): 68 system('umount -f %s > /dev/null 2> /dev/null'%\ 69 self.tmpdir, ignorestatus=True) 70 system('rm -rf ' + self.tmpdir) 71 os.mkdir(self.tmpdir) 72 73 results = os.path.join(self.autodir, 'results') 74 if not os.path.exists(results): 75 os.mkdir(results) 76 77 download = os.path.join(self.testdir, 'download') 78 if os.path.exists(download): 79 system('rm -rf ' + download) 80 os.mkdir(download) 81 82 if os.path.exists(self.resultdir): 83 system('rm -rf ' + self.resultdir) 84 os.mkdir(self.resultdir) 85 86 os.mkdir(os.path.join(self.resultdir, 'debug')) 87 os.mkdir(os.path.join(self.resultdir, 'analysis')) 88 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 89 90 shutil.copyfile(self.control, 91 os.path.join(self.resultdir, 'control')) 92 93 self.control = control 94 self.jobtag = jobtag 95 96 self.stdout = fd_stack.fd_stack(1, sys.stdout) 97 self.stderr = fd_stack.fd_stack(2, sys.stderr) 98 self.group_level = 0 99 100 self.config = config.config(self) 101 102 self.harness = harness.select(harness_type, self) 103 104 self.profilers = profilers.profilers(self) 105 106 try: 107 tool = self.config_get('boottool.executable') 108 self.bootloader = boottool.boottool(tool) 109 except: 110 pass 111 112 # log "before each step" sysinfo 113 pwd = os.getcwd() 114 try: 115 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 116 sysinfo.before_each_step() 117 finally: 118 os.chdir(pwd) 119 120 if not cont: 121 self.record('START', None, None) 122 self.group_level = 1 123 124 self.harness.run_start() 125 126 127 def relative_path(self, path): 128 """\ 129 Return a patch relative to the job results directory 130 """ 131 head = len(self.resultdir) + 1 # remove the / inbetween 132 return path[head:] 133 134 135 def control_get(self): 136 return self.control 137 138 139 def control_set(self, control): 140 self.control = os.path.abspath(control) 141 142 143 def harness_select(self, which): 144 self.harness = harness.select(which, self) 145 146 147 def config_set(self, name, value): 148 self.config.set(name, value) 149 150 151 def config_get(self, name): 152 return self.config.get(name) 153 154 def setup_dirs(self, results_dir, tmp_dir): 155 if not tmp_dir: 156 tmp_dir = os.path.join(self.tmpdir, 'build') 157 if not os.path.exists(tmp_dir): 158 os.mkdir(tmp_dir) 159 if not os.path.isdir(tmp_dir): 160 raise "Temp dir (%s) is not a dir - args backwards?" \ 161 % self.tmpdir 162 163 # We label the first build "build" and then subsequent ones 164 # as "build.2", "build.3", etc. Whilst this is a little bit 165 # inconsistent, 99.9% of jobs will only have one build 166 # (that's not done as kernbench, sparse, or buildtest), 167 # so it works out much cleaner. One of life's comprimises. 168 if not results_dir: 169 results_dir = os.path.join(self.resultdir, 'build') 170 i = 2 171 while os.path.exists(results_dir): 172 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 173 i += 1 174 if not os.path.exists(results_dir): 175 os.mkdir(results_dir) 176 177 return (results_dir, tmp_dir) 178 179 180 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 181 kjob = None ): 182 """Summon a xen object""" 183 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 184 build_dir = 'xen' 185 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 186 187 188 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 189 """Summon a kernel object""" 190 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 191 build_dir = 'linux' 192 return kernel.auto_kernel(self, base_tree, results_dir, 193 tmp_dir, build_dir, leave) 194 195 196 def barrier(self, *args): 197 """Create a barrier object""" 198 return barrier.barrier(*args) 199 200 201 def setup_dep(self, deps): 202 """Set up the dependencies for this test. 203 204 deps is a list of libraries required for this test. 205 """ 206 for dep in deps: 207 try: 208 os.chdir(os.path.join(self.autodir, 'deps', dep)) 209 system('./' + dep + '.py') 210 except: 211 error = "setting up dependency " + dep + "\n" 212 raise UnhandledError(error) 213 214 215 def __runtest(self, url, tag, args, dargs): 216 try: 217 l = lambda : test.runtest(self, url, tag, args, dargs) 218 pid = fork_start(self.resultdir, l) 219 fork_waitfor(self.resultdir, pid) 220 except AutotestError: 221 raise 222 except: 223 raise UnhandledError('running test ' + \ 224 self.__class__.__name__ + "\n") 225 226 227 def run_test(self, url, *args, **dargs): 228 """Summon a test object and run it. 229 230 tag 231 tag to add to testname 232 url 233 url of the test to run 234 """ 235 236 if not url: 237 raise "Test name is invalid. Switched arguments?" 238 (group, testname) = test.testname(url) 239 tag = dargs.pop('tag', None) 240 subdir = testname 241 if tag: 242 subdir += '.' + tag 243 244 def group_func(): 245 try: 246 self.__runtest(url, tag, args, dargs) 247 except Exception, detail: 248 self.record('FAIL', subdir, testname, 249 str(detail)) 250 raise 251 else: 252 self.record('GOOD', subdir, testname, 253 'completed successfully') 254 result, exc_info = self.__rungroup(subdir, group_func) 255 256 if exc_info and isinstance(exc_info[1], TestError): 257 return False 258 elif exc_info: 259 raise exc_info[0], exc_info[1], exc_info[2] 260 else: 261 return True 262 263 264 def __rungroup(self, name, function, *args, **dargs): 265 """\ 266 name: 267 name of the group 268 function: 269 subroutine to run 270 *args: 271 arguments for the function 272 273 Returns a 2-tuple (result, exc_info) where result 274 is the return value of function, and exc_info is 275 the sys.exc_info() of the exception thrown by the 276 function (which may be None). 277 """ 278 279 result, exc_info = None, None 280 try: 281 self.record('START', None, name) 282 self.group_level += 1 283 result = function(*args, **dargs) 284 self.group_level -= 1 285 self.record('END GOOD', None, name) 286 except Exception, e: 287 exc_info = sys.exc_info() 288 self.group_level -= 1 289 err_msg = str(e) + '\n' + format_error() 290 self.record('END FAIL', None, name, err_msg) 291 292 return result, exc_info 293 294 295 def run_group(self, function, *args, **dargs): 296 """\ 297 function: 298 subroutine to run 299 *args: 300 arguments for the function 301 """ 302 303 # Allow the tag for the group to be specified 304 name = function.__name__ 305 tag = dargs.pop('tag', None) 306 if tag: 307 name = tag 308 309 result, exc_info = self.__rungroup(name, function, 310 *args, **dargs) 311 312 # if there was a non-TestError exception, raise it 313 if exc_info and isinstance(exc_info[1], TestError): 314 err = ''.join(traceback.format_exception(*exc_info)) 315 raise TestError(name + ' failed\n' + err) 316 317 # pass back the actual return value from the function 318 return result 319 320 321 # Check the passed kernel identifier against the command line 322 # and the running kernel, abort the job on missmatch. 323 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 324 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 325 % (expected_when, expected_id, expected_cl, type) 326 327 running_id = running_os_ident() 328 329 cmdline = read_one_line("/proc/cmdline") 330 331 find_sum = re.compile(r'.*IDENT=(\d+)') 332 m = find_sum.match(cmdline) 333 cmdline_when = -1 334 if m: 335 cmdline_when = int(m.groups()[0]) 336 337 cl_re = re.compile(r'\d{7,}') 338 cl_match = cl_re.search(system_output('uname -v').split()[1]) 339 if cl_match: 340 current_cl = cl_match.group() 341 else: 342 current_cl = None 343 344 # We have all the facts, see if they indicate we 345 # booted the requested kernel or not. 346 bad = False 347 if (type == 'src' and expected_id != running_id or 348 type == 'rpm' and not running_id.startswith(expected_id + '::')): 349 print "check_kernel_ident: kernel identifier mismatch" 350 bad = True 351 if expected_when != cmdline_when: 352 print "check_kernel_ident: kernel command line mismatch" 353 bad = True 354 if expected_cl and current_cl and str(expected_cl) != current_cl: 355 print 'check_kernel_ident: kernel changelist mismatch' 356 bad = True 357 358 if bad: 359 print " Expected Ident: " + expected_id 360 print " Running Ident: " + running_id 361 print " Expected Mark: %d" % (expected_when) 362 print "Command Line Mark: %d" % (cmdline_when) 363 print " Expected P4 CL: %s" % expected_cl 364 print " P4 CL: %s" % current_cl 365 print " Command Line: " + cmdline 366 367 raise JobError("boot failure", "reboot.verify") 368 369 self.record('GOOD', subdir, 'reboot.verify') 370 371 372 def filesystem(self, device, mountpoint = None, loop_size = 0): 373 if not mountpoint: 374 mountpoint = self.tmpdir 375 return filesystem.filesystem(self, device, mountpoint,loop_size) 376 377 378 def reboot(self, tag='autotest'): 379 self.record('GOOD', None, 'reboot.start') 380 self.harness.run_reboot() 381 default = self.config_get('boot.set_default') 382 if default: 383 self.bootloader.set_default(tag) 384 else: 385 self.bootloader.boot_once(tag) 386 system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 387 self.quit() 388 389 390 def noop(self, text): 391 print "job: noop: " + text 392 393 394 # Job control primatives. 395 396 def __parallel_execute(self, func, *args): 397 func(*args) 398 399 400 def parallel(self, *tasklist): 401 """Run tasks in parallel""" 402 403 pids = [] 404 for task in tasklist: 405 pids.append(fork_start(self.resultdir, 406 lambda: self.__parallel_execute(*task))) 407 for pid in pids: 408 fork_waitfor(self.resultdir, pid) 409 410 411 def quit(self): 412 # XXX: should have a better name. 413 self.harness.run_pause() 414 raise JobContinue("more to come") 415 416 417 def complete(self, status): 418 """Clean up and exit""" 419 # We are about to exit 'complete' so clean up the control file. 420 try: 421 os.unlink(self.control + '.state') 422 except: 423 pass 424 self.harness.run_complete() 425 sys.exit(status) 426 427 428 steps = [] 429 def next_step(self, step): 430 """Define the next step""" 431 if not isinstance(step[0], basestring): 432 step[0] = step[0].__name__ 433 self.steps.append(step) 434 pickle.dump(self.steps, open(self.control + '.state', 'w')) 435 436 437 def next_step_prepend(self, step): 438 """Insert a new step, executing first""" 439 if not isinstance(step[0], basestring): 440 step[0] = step[0].__name__ 441 self.steps.insert(0, step) 442 pickle.dump(self.steps, open(self.control + '.state', 'w')) 443 444 445 def step_engine(self): 446 """the stepping engine -- if the control file defines 447 step_init we will be using this engine to drive multiple runs. 448 """ 449 """Do the next step""" 450 lcl = dict({'job': self}) 451 452 str = """ 453from common.error import * 454from autotest_utils import * 455""" 456 exec(str, lcl, lcl) 457 execfile(self.control, lcl, lcl) 458 459 state = self.control + '.state' 460 # If there is a mid-job state file load that in and continue 461 # where it indicates. Otherwise start stepping at the passed 462 # entry. 463 try: 464 self.steps = pickle.load(open(state, 'r')) 465 except: 466 if lcl.has_key('step_init'): 467 self.next_step([lcl['step_init']]) 468 469 # Run the step list. 470 while len(self.steps) > 0: 471 step = self.steps.pop(0) 472 pickle.dump(self.steps, open(state, 'w')) 473 474 cmd = step.pop(0) 475 lcl['__args'] = step 476 exec(cmd + "(*__args)", lcl, lcl) 477 478 479 def record(self, status_code, subdir, operation, status = ''): 480 """ 481 Record job-level status 482 483 The intent is to make this file both machine parseable and 484 human readable. That involves a little more complexity, but 485 really isn't all that bad ;-) 486 487 Format is <status code>\t<subdir>\t<operation>\t<status> 488 489 status code: (GOOD|WARN|FAIL|ABORT) 490 or START 491 or END (GOOD|WARN|FAIL|ABORT) 492 493 subdir: MUST be a relevant subdirectory in the results, 494 or None, which will be represented as '----' 495 496 operation: description of what you ran (e.g. "dbench", or 497 "mkfs -t foobar /dev/sda9") 498 499 status: error message or "completed sucessfully" 500 501 ------------------------------------------------------------ 502 503 Initial tabs indicate indent levels for grouping, and is 504 governed by self.group_level 505 506 multiline messages have secondary lines prefaced by a double 507 space (' ') 508 """ 509 510 if subdir: 511 if re.match(r'[\n\t]', subdir): 512 raise "Invalid character in subdir string" 513 substr = subdir 514 else: 515 substr = '----' 516 517 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 518 status_code): 519 raise "Invalid status code supplied: %s" % status_code 520 if not operation: 521 operation = '----' 522 if re.match(r'[\n\t]', operation): 523 raise "Invalid character in operation string" 524 operation = operation.rstrip() 525 status = status.rstrip() 526 status = re.sub(r"\t", " ", status) 527 # Ensure any continuation lines are marked so we can 528 # detect them in the status file to ensure it is parsable. 529 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 530 531 # Generate timestamps for inclusion in the logs 532 epoch_time = int(time.time()) # seconds since epoch, in UTC 533 local_time = time.localtime(epoch_time) 534 epoch_time_str = "timestamp=%d" % (epoch_time,) 535 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 536 local_time) 537 538 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 539 epoch_time_str, local_time_str, 540 status)) 541 msg = '\t' * self.group_level + msg 542 543 self.harness.test_status_detail(status_code, substr, 544 operation, status) 545 self.harness.test_status(msg) 546 print msg 547 status_file = os.path.join(self.resultdir, 'status') 548 open(status_file, "a").write(msg + "\n") 549 if subdir: 550 status_file = os.path.join(self.resultdir, subdir, 'status') 551 open(status_file, "a").write(msg + "\n") 552 553 554def runjob(control, cont = False, tag = "default", harness_type = ''): 555 """The main interface to this module 556 557 control 558 The control file to use for this job. 559 cont 560 Whether this is the continuation of a previously started job 561 """ 562 control = os.path.abspath(control) 563 state = control + '.state' 564 565 # instantiate the job object ready for the control file. 566 myjob = None 567 try: 568 # Check that the control file is valid 569 if not os.path.exists(control): 570 raise JobError(control + ": control file not found") 571 572 # When continuing, the job is complete when there is no 573 # state file, ensure we don't try and continue. 574 if cont and not os.path.exists(state): 575 raise JobComplete("all done") 576 if cont == False and os.path.exists(state): 577 os.unlink(state) 578 579 myjob = job(control, tag, cont, harness_type) 580 581 # Load in the users control file, may do any one of: 582 # 1) execute in toto 583 # 2) define steps, and select the first via next_step() 584 myjob.step_engine() 585 586 except JobContinue: 587 sys.exit(5) 588 589 except JobComplete: 590 sys.exit(1) 591 592 except JobError, instance: 593 print "JOB ERROR: " + instance.args[0] 594 if myjob: 595 command = None 596 if len(instance.args) > 1: 597 command = instance.args[1] 598 myjob.group_level = 0 599 myjob.record('ABORT', None, command, instance.args[0]) 600 myjob.record('END ABORT', None, None) 601 myjob.complete(1) 602 else: 603 sys.exit(1) 604 605 except Exception, e: 606 msg = str(e) + '\n' + format_error() 607 print "JOB ERROR: " + msg 608 if myjob: 609 myjob.group_level = 0 610 myjob.record('ABORT', None, None, msg) 611 myjob.record('END ABORT', None, None) 612 myjob.complete(1) 613 else: 614 sys.exit(1) 615 616 # If we get here, then we assume the job is complete and good. 617 myjob.group_level = 0 618 myjob.record('END GOOD', None, None) 619 myjob.complete(0) 620