job.py revision cfc6dd3f4fa1b68921c785e720574168486e49ff
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from error import * 14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 15import harness, config 16import sysinfo 17 18class job: 19 """The actual job against which we do everything. 20 21 Properties: 22 autodir 23 The top level autotest directory (/usr/local/autotest). 24 Comes from os.environ['AUTODIR']. 25 bindir 26 <autodir>/bin/ 27 testdir 28 <autodir>/tests/ 29 profdir 30 <autodir>/profilers/ 31 tmpdir 32 <autodir>/tmp/ 33 resultdir 34 <autodir>/results/<jobtag> 35 stdout 36 fd_stack object for stdout 37 stderr 38 fd_stack object for stderr 39 profilers 40 the profilers object for this job 41 harness 42 the server harness object for this job 43 config 44 the job configuration for this job 45 """ 46 47 def __init__(self, control, jobtag, cont, harness_type=None): 48 """ 49 control 50 The control file (pathname of) 51 jobtag 52 The job tag string (eg "default") 53 cont 54 If this is the continuation of this job 55 harness_type 56 An alternative server harness 57 """ 58 self.autodir = os.environ['AUTODIR'] 59 self.bindir = os.path.join(self.autodir, 'bin') 60 self.testdir = os.path.join(self.autodir, 'tests') 61 self.profdir = os.path.join(self.autodir, 'profilers') 62 self.tmpdir = os.path.join(self.autodir, 'tmp') 63 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 64 65 if not cont: 66 if os.path.exists(self.tmpdir): 67 system('umount -f %s > /dev/null 2> /dev/null'%\ 68 self.tmpdir, ignorestatus=True) 69 system('rm -rf ' + self.tmpdir) 70 os.mkdir(self.tmpdir) 71 72 results = os.path.join(self.autodir, 'results') 73 if not os.path.exists(results): 74 os.mkdir(results) 75 76 download = os.path.join(self.testdir, 'download') 77 if os.path.exists(download): 78 system('rm -rf ' + download) 79 os.mkdir(download) 80 81 if os.path.exists(self.resultdir): 82 system('rm -rf ' + self.resultdir) 83 os.mkdir(self.resultdir) 84 85 os.mkdir(os.path.join(self.resultdir, 'debug')) 86 os.mkdir(os.path.join(self.resultdir, 'analysis')) 87 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 88 89 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 90 91 self.control = control 92 self.jobtag = jobtag 93 94 self.stdout = fd_stack.fd_stack(1, sys.stdout) 95 self.stderr = fd_stack.fd_stack(2, sys.stderr) 96 self.group_level = 0 97 98 self.config = config.config(self) 99 100 self.harness = harness.select(harness_type, self) 101 102 self.profilers = profilers.profilers(self) 103 104 try: 105 tool = self.config_get('boottool.executable') 106 self.bootloader = boottool.boottool(tool) 107 except: 108 pass 109 110 # log "before each step" sysinfo 111 pwd = os.getcwd() 112 try: 113 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 114 sysinfo.before_each_step() 115 finally: 116 os.chdir(pwd) 117 118 if not cont: 119 self.record('START', None, None) 120 self.harness.run_start() 121 self.group_level = 1 122 123 124 def relative_path(self, path): 125 """\ 126 Return a patch relative to the job results directory 127 """ 128 head = len(self.resultdir) + 1 # remove the / inbetween 129 return path[head:] 130 131 132 def control_get(self): 133 return self.control 134 135 136 def harness_select(self, which): 137 self.harness = harness.select(which, self) 138 139 140 def config_set(self, name, value): 141 self.config.set(name, value) 142 143 144 def config_get(self, name): 145 return self.config.get(name) 146 147 def setup_dirs(self, results_dir, tmp_dir): 148 if not tmp_dir: 149 tmp_dir = os.path.join(self.tmpdir, 'build') 150 if not os.path.exists(tmp_dir): 151 os.mkdir(tmp_dir) 152 if not os.path.isdir(tmp_dir): 153 raise "Temp dir (%s) is not a dir - args backwards?" \ 154 % self.tmpdir 155 156 # We label the first build "build" and then subsequent ones 157 # as "build.2", "build.3", etc. Whilst this is a little bit 158 # inconsistent, 99.9% of jobs will only have one build 159 # (that's not done as kernbench, sparse, or buildtest), 160 # so it works out much cleaner. One of life's comprimises. 161 if not results_dir: 162 results_dir = os.path.join(self.resultdir, 'build') 163 i = 2 164 while os.path.exists(results_dir): 165 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 166 i += 1 167 if not os.path.exists(results_dir): 168 os.mkdir(results_dir) 169 170 return (results_dir, tmp_dir) 171 172 173 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 174 kjob = None ): 175 """Summon a xen object""" 176 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 177 build_dir = 'xen' 178 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 179 180 181 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 182 """Summon a kernel object""" 183 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 184 build_dir = 'linux' 185 return kernel.auto_kernel(self, base_tree, results_dir, 186 tmp_dir, build_dir, leave) 187 188 189 def barrier(self, *args): 190 """Create a barrier object""" 191 return barrier.barrier(*args) 192 193 194 def setup_dep(self, deps): 195 """Set up the dependencies for this test. 196 197 deps is a list of libraries required for this test. 198 """ 199 for dep in deps: 200 try: 201 os.chdir(os.path.join(self.autodir, 'deps', dep)) 202 system('./' + dep + '.py') 203 except: 204 error = "setting up dependency " + dep + "\n" 205 raise UnhandledError(error) 206 207 208 def __runtest(self, url, tag, args, dargs): 209 try: 210 l = lambda : test.runtest(self, url, tag, args, dargs) 211 pid = fork_start(self.resultdir, l) 212 fork_waitfor(self.resultdir, pid) 213 except AutotestError: 214 raise 215 except: 216 raise UnhandledError('running test ' + \ 217 self.__class__.__name__ + "\n") 218 219 220 def run_test(self, url, *args, **dargs): 221 """Summon a test object and run it. 222 223 tag 224 tag to add to testname 225 url 226 url of the test to run 227 """ 228 229 if not url: 230 raise "Test name is invalid. Switched arguments?" 231 (group, testname) = test.testname(url) 232 tag = dargs.pop('tag', None) 233 subdir = testname 234 if tag: 235 subdir += '.' + tag 236 237 def group_func(): 238 try: 239 self.__runtest(url, tag, args, dargs) 240 except Exception, detail: 241 self.record('FAIL', subdir, testname, 242 str(detail)) 243 raise 244 else: 245 self.record('GOOD', subdir, testname, 246 'completed successfully') 247 result, exc_info = self.__rungroup(subdir, group_func) 248 249 if exc_info and isinstance(exc_info[1], TestError): 250 return False 251 elif exc_info: 252 raise exc_info[0], exc_info[1], exc_info[2] 253 else: 254 return True 255 256 257 def __rungroup(self, name, function, *args, **dargs): 258 """\ 259 name: 260 name of the group 261 function: 262 subroutine to run 263 *args: 264 arguments for the function 265 266 Returns a 2-tuple (result, exc_info) where result 267 is the return value of function, and exc_info is 268 the sys.exc_info() of the exception thrown by the 269 function (which may be None). 270 """ 271 272 result, exc_info = None, None 273 try: 274 self.record('START', None, name) 275 self.group_level += 1 276 result = function(*args, **dargs) 277 self.group_level -= 1 278 self.record('END GOOD', None, name) 279 except Exception, e: 280 exc_info = sys.exc_info() 281 self.group_level -= 1 282 self.record('END FAIL', None, name, format_error()) 283 284 return result, exc_info 285 286 287 def run_group(self, function, *args, **dargs): 288 """\ 289 function: 290 subroutine to run 291 *args: 292 arguments for the function 293 """ 294 295 # Allow the tag for the group to be specified 296 name = function.__name__ 297 tag = dargs.pop('tag', None) 298 if tag: 299 name = tag 300 301 result, exc_info = self.__rungroup(name, function, 302 *args, **dargs) 303 304 # if there was a non-TestError exception, raise it 305 if exc_info and isinstance(exc_info[1], TestError): 306 err = ''.join(traceback.format_exception(*exc_info)) 307 raise TestError(name + ' failed\n' + err) 308 309 # pass back the actual return value from the function 310 return result 311 312 313 # Check the passed kernel identifier against the command line 314 # and the running kernel, abort the job on missmatch. 315 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 316 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 317 % (expected_when, expected_id, expected_cl, type) 318 319 running_id = running_os_ident() 320 321 cmdline = read_one_line("/proc/cmdline") 322 323 find_sum = re.compile(r'.*IDENT=(\d+)') 324 m = find_sum.match(cmdline) 325 cmdline_when = -1 326 if m: 327 cmdline_when = int(m.groups()[0]) 328 329 cl_re = re.compile(r'\d{7,}') 330 cl_match = cl_re.search(system_output('uname -v').split()[1]) 331 if cl_match: 332 current_cl = cl_match.group() 333 else: 334 current_cl = None 335 336 # We have all the facts, see if they indicate we 337 # booted the requested kernel or not. 338 bad = False 339 if (type == 'src' and expected_id != running_id or 340 type == 'rpm' and not running_id.startswith(expected_id + '::')): 341 print "check_kernel_ident: kernel identifier mismatch" 342 bad = True 343 if expected_when != cmdline_when: 344 print "check_kernel_ident: kernel command line mismatch" 345 bad = True 346 if expected_cl and current_cl and str(expected_cl) != current_cl: 347 print 'check_kernel_ident: kernel changelist mismatch' 348 bad = True 349 350 if bad: 351 print " Expected Ident: " + expected_id 352 print " Running Ident: " + running_id 353 print " Expected Mark: %d" % (expected_when) 354 print "Command Line Mark: %d" % (cmdline_when) 355 print " Expected P4 CL: %s" % expected_cl 356 print " P4 CL: %s" % current_cl 357 print " Command Line: " + cmdline 358 359 raise JobError("boot failure", "reboot.verify") 360 361 self.record('GOOD', subdir, 'reboot.verify') 362 363 364 def filesystem(self, device, mountpoint = None, loop_size = 0): 365 if not mountpoint: 366 mountpoint = self.tmpdir 367 return filesystem.filesystem(self, device, mountpoint,loop_size) 368 369 370 def reboot(self, tag='autotest'): 371 self.record('GOOD', None, 'reboot.start') 372 self.harness.run_reboot() 373 default = self.config_get('boot.set_default') 374 if default: 375 self.bootloader.set_default(tag) 376 else: 377 self.bootloader.boot_once(tag) 378 system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 379 self.quit() 380 381 382 def noop(self, text): 383 print "job: noop: " + text 384 385 386 # Job control primatives. 387 388 def __parallel_execute(self, func, *args): 389 func(*args) 390 391 392 def parallel(self, *tasklist): 393 """Run tasks in parallel""" 394 395 pids = [] 396 for task in tasklist: 397 pids.append(fork_start(self.resultdir, 398 lambda: self.__parallel_execute(*task))) 399 for pid in pids: 400 fork_waitfor(self.resultdir, pid) 401 402 403 def quit(self): 404 # XXX: should have a better name. 405 self.harness.run_pause() 406 raise JobContinue("more to come") 407 408 409 def complete(self, status): 410 """Clean up and exit""" 411 # We are about to exit 'complete' so clean up the control file. 412 try: 413 os.unlink(self.control + '.state') 414 except: 415 pass 416 self.harness.run_complete() 417 sys.exit(status) 418 419 420 steps = [] 421 def next_step(self, step): 422 """Define the next step""" 423 if not isinstance(step[0], basestring): 424 step[0] = step[0].__name__ 425 self.steps.append(step) 426 pickle.dump(self.steps, open(self.control + '.state', 'w')) 427 428 429 def next_step_prepend(self, step): 430 """Insert a new step, executing first""" 431 if not isinstance(step[0], basestring): 432 step[0] = step[0].__name__ 433 self.steps.insert(0, step) 434 pickle.dump(self.steps, open(self.control + '.state', 'w')) 435 436 437 def step_engine(self): 438 """the stepping engine -- if the control file defines 439 step_init we will be using this engine to drive multiple runs. 440 """ 441 """Do the next step""" 442 lcl = dict({'job': self}) 443 444 str = """ 445from error import * 446from autotest_utils import * 447""" 448 exec(str, lcl, lcl) 449 execfile(self.control, lcl, lcl) 450 451 state = self.control + '.state' 452 # If there is a mid-job state file load that in and continue 453 # where it indicates. Otherwise start stepping at the passed 454 # entry. 455 try: 456 self.steps = pickle.load(open(state, 'r')) 457 except: 458 if lcl.has_key('step_init'): 459 self.next_step([lcl['step_init']]) 460 461 # Run the step list. 462 while len(self.steps) > 0: 463 step = self.steps.pop(0) 464 pickle.dump(self.steps, open(state, 'w')) 465 466 cmd = step.pop(0) 467 lcl['__args'] = step 468 exec(cmd + "(*__args)", lcl, lcl) 469 470 471 def record(self, status_code, subdir, operation, status = ''): 472 """ 473 Record job-level status 474 475 The intent is to make this file both machine parseable and 476 human readable. That involves a little more complexity, but 477 really isn't all that bad ;-) 478 479 Format is <status code>\t<subdir>\t<operation>\t<status> 480 481 status code: (GOOD|WARN|FAIL|ABORT) 482 or START 483 or END (GOOD|WARN|FAIL|ABORT) 484 485 subdir: MUST be a relevant subdirectory in the results, 486 or None, which will be represented as '----' 487 488 operation: description of what you ran (e.g. "dbench", or 489 "mkfs -t foobar /dev/sda9") 490 491 status: error message or "completed sucessfully" 492 493 ------------------------------------------------------------ 494 495 Initial tabs indicate indent levels for grouping, and is 496 governed by self.group_level 497 498 multiline messages have secondary lines prefaced by a double 499 space (' ') 500 """ 501 502 if subdir: 503 if re.match(r'[\n\t]', subdir): 504 raise "Invalid character in subdir string" 505 substr = subdir 506 else: 507 substr = '----' 508 509 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 510 status_code): 511 raise "Invalid status code supplied: %s" % status_code 512 if not operation: 513 operation = '----' 514 if re.match(r'[\n\t]', operation): 515 raise "Invalid character in operation string" 516 operation = operation.rstrip() 517 status = status.rstrip() 518 status = re.sub(r"\t", " ", status) 519 # Ensure any continuation lines are marked so we can 520 # detect them in the status file to ensure it is parsable. 521 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 522 523 # Generate timestamps for inclusion in the logs 524 epoch_time = int(time.time()) # seconds since epoch, in UTC 525 local_time = time.localtime(epoch_time) 526 epoch_time_str = "timestamp=%d" % (epoch_time,) 527 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 528 local_time) 529 530 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 531 epoch_time_str, local_time_str, 532 status)) 533 msg = '\t' * self.group_level + msg 534 535 self.harness.test_status_detail(status_code, substr, 536 operation, status) 537 self.harness.test_status(msg) 538 print msg 539 status_file = os.path.join(self.resultdir, 'status') 540 open(status_file, "a").write(msg + "\n") 541 if subdir: 542 status_file = os.path.join(self.resultdir, subdir, 'status') 543 open(status_file, "a").write(msg + "\n") 544 545 546def runjob(control, cont = False, tag = "default", harness_type = ''): 547 """The main interface to this module 548 549 control 550 The control file to use for this job. 551 cont 552 Whether this is the continuation of a previously started job 553 """ 554 control = os.path.abspath(control) 555 state = control + '.state' 556 557 # instantiate the job object ready for the control file. 558 myjob = None 559 try: 560 # Check that the control file is valid 561 if not os.path.exists(control): 562 raise JobError(control + ": control file not found") 563 564 # When continuing, the job is complete when there is no 565 # state file, ensure we don't try and continue. 566 if cont and not os.path.exists(state): 567 sys.exit(1) 568 if cont == False and os.path.exists(state): 569 os.unlink(state) 570 571 myjob = job(control, tag, cont, harness_type) 572 573 # Load in the users control file, may do any one of: 574 # 1) execute in toto 575 # 2) define steps, and select the first via next_step() 576 myjob.step_engine() 577 578 except JobContinue: 579 sys.exit(5) 580 581 except JobError, instance: 582 print "JOB ERROR: " + instance.args[0] 583 if myjob: 584 command = None 585 if len(instance.args) > 1: 586 command = instance.args[1] 587 myjob.group_level = 0 588 myjob.record('ABORT', None, command, instance.args[0]) 589 myjob.record('END ABORT', None, None) 590 myjob.complete(1) 591 592 except Exception, e: 593 msg = format_error() 594 print "JOB ERROR: " + msg 595 if myjob: 596 myjob.group_level = 0 597 myjob.record('ABORT', None, None, msg) 598 myjob.record('END ABORT', None, None) 599 myjob.complete(1) 600 601 # If we get here, then we assume the job is complete and good. 602 myjob.group_level = 0 603 myjob.record('END GOOD', None, None) 604 myjob.complete(0) 605