job.py revision 30270306182f4feb599278ce8a7412fa8526eb68
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from error import * 14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 15import harness, config 16import sysinfo 17 18class job: 19 """The actual job against which we do everything. 20 21 Properties: 22 autodir 23 The top level autotest directory (/usr/local/autotest). 24 Comes from os.environ['AUTODIR']. 25 bindir 26 <autodir>/bin/ 27 testdir 28 <autodir>/tests/ 29 profdir 30 <autodir>/profilers/ 31 tmpdir 32 <autodir>/tmp/ 33 resultdir 34 <autodir>/results/<jobtag> 35 stdout 36 fd_stack object for stdout 37 stderr 38 fd_stack object for stderr 39 profilers 40 the profilers object for this job 41 harness 42 the server harness object for this job 43 config 44 the job configuration for this job 45 """ 46 47 def __init__(self, control, jobtag, cont, harness_type=None): 48 """ 49 control 50 The control file (pathname of) 51 jobtag 52 The job tag string (eg "default") 53 cont 54 If this is the continuation of this job 55 harness_type 56 An alternative server harness 57 """ 58 self.autodir = os.environ['AUTODIR'] 59 self.bindir = os.path.join(self.autodir, 'bin') 60 self.testdir = os.path.join(self.autodir, 'tests') 61 self.profdir = os.path.join(self.autodir, 'profilers') 62 self.tmpdir = os.path.join(self.autodir, 'tmp') 63 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 64 65 if not cont: 66 if os.path.exists(self.tmpdir): 67 system('umount -f %s > /dev/null 2> /dev/null'%\ 68 self.tmpdir, ignorestatus=True) 69 system('rm -rf ' + self.tmpdir) 70 os.mkdir(self.tmpdir) 71 72 results = os.path.join(self.autodir, 'results') 73 if not os.path.exists(results): 74 os.mkdir(results) 75 76 download = os.path.join(self.testdir, 'download') 77 if os.path.exists(download): 78 system('rm -rf ' + download) 79 os.mkdir(download) 80 81 if os.path.exists(self.resultdir): 82 system('rm -rf ' + self.resultdir) 83 os.mkdir(self.resultdir) 84 85 os.mkdir(os.path.join(self.resultdir, 'debug')) 86 os.mkdir(os.path.join(self.resultdir, 'analysis')) 87 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 88 89 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 90 91 self.control = control 92 self.jobtag = jobtag 93 94 self.stdout = fd_stack.fd_stack(1, sys.stdout) 95 self.stderr = fd_stack.fd_stack(2, sys.stderr) 96 self.record_prefix = '' 97 98 self.config = config.config(self) 99 100 self.harness = harness.select(harness_type, self) 101 102 self.profilers = profilers.profilers(self) 103 104 try: 105 tool = self.config_get('boottool.executable') 106 self.bootloader = boottool.boottool(tool) 107 except: 108 pass 109 110 # log "before each step" sysinfo 111 pwd = os.getcwd() 112 try: 113 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 114 sysinfo.before_each_step() 115 finally: 116 os.chdir(pwd) 117 118 if not cont: 119 self.harness.run_start() 120 121 122 def relative_path(self, path): 123 """\ 124 Return a patch relative to the job results directory 125 """ 126 head = len(self.resultdir) + 1 # remove the / inbetween 127 return path[head:] 128 129 130 def control_get(self): 131 return self.control 132 133 134 def harness_select(self, which): 135 self.harness = harness.select(which, self) 136 137 138 def config_set(self, name, value): 139 self.config.set(name, value) 140 141 142 def config_get(self, name): 143 return self.config.get(name) 144 145 def setup_dirs(self, results_dir, tmp_dir): 146 if not tmp_dir: 147 tmp_dir = os.path.join(self.tmpdir, 'build') 148 if not os.path.exists(tmp_dir): 149 os.mkdir(tmp_dir) 150 if not os.path.isdir(tmp_dir): 151 raise "Temp dir (%s) is not a dir - args backwards?" \ 152 % self.tmpdir 153 154 # We label the first build "build" and then subsequent ones 155 # as "build.2", "build.3", etc. Whilst this is a little bit 156 # inconsistent, 99.9% of jobs will only have one build 157 # (that's not done as kernbench, sparse, or buildtest), 158 # so it works out much cleaner. One of life's comprimises. 159 if not results_dir: 160 results_dir = os.path.join(self.resultdir, 'build') 161 i = 2 162 while os.path.exists(results_dir): 163 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 164 i += 1 165 if not os.path.exists(results_dir): 166 os.mkdir(results_dir) 167 168 return (results_dir, tmp_dir) 169 170 171 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 172 kjob = None ): 173 """Summon a xen object""" 174 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 175 build_dir = 'xen' 176 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 177 178 179 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 180 """Summon a kernel object""" 181 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 182 if base_tree.endswith('.rpm'): 183 return kernel.rpm_kernel(self, base_tree, results_dir) 184 build_dir = 'linux' 185 return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave) 186 187 188 def barrier(self, *args): 189 """Create a barrier object""" 190 return barrier.barrier(*args) 191 192 193 def setup_dep(self, deps): 194 """Set up the dependencies for this test. 195 196 deps is a list of libraries required for this test. 197 """ 198 for dep in deps: 199 try: 200 os.chdir(os.path.join(self.autodir, 'deps', dep)) 201 system('./' + dep + '.py') 202 except: 203 error = "setting up dependency " + dep + "\n" 204 raise UnhandledError(error) 205 206 207 def __runtest(self, url, tag, args, dargs): 208 try: 209 l = lambda : test.runtest(self, url, tag, args, dargs) 210 pid = fork_start(self.resultdir, l) 211 fork_waitfor(self.resultdir, pid) 212 except AutotestError: 213 raise 214 except: 215 raise UnhandledError('running test ' + \ 216 self.__class__.__name__ + "\n") 217 218 219 def run_test(self, url, *args, **dargs): 220 """Summon a test object and run it. 221 222 tag 223 tag to add to testname 224 url 225 url of the test to run 226 """ 227 228 if not url: 229 raise "Test name is invalid. Switched arguments?" 230 (group, testname) = test.testname(url) 231 tag = None 232 subdir = testname 233 if dargs.has_key('tag'): 234 tag = dargs['tag'] 235 del dargs['tag'] 236 if tag: 237 subdir += '.' + tag 238 try: 239 try: 240 self.__runtest(url, tag, args, dargs) 241 except Exception, detail: 242 self.record('FAIL', subdir, testname, \ 243 detail.__str__()) 244 245 raise 246 else: 247 self.record('GOOD', subdir, testname, \ 248 'completed successfully') 249 except TestError: 250 return 0 251 except: 252 raise 253 else: 254 return 1 255 256 257 def run_group(self, function, *args, **dargs): 258 """\ 259 function: 260 subroutine to run 261 *args: 262 arguments for the function 263 """ 264 265 result = None 266 name = function.__name__ 267 268 # Allow the tag for the group to be specified. 269 if dargs.has_key('tag'): 270 tag = dargs['tag'] 271 del dargs['tag'] 272 if tag: 273 name = tag 274 275 # if tag: 276 # name += '.' + tag 277 old_record_prefix = self.record_prefix 278 try: 279 try: 280 self.record('START', None, name) 281 self.record_prefix += '\t' 282 result = function(*args, **dargs) 283 self.record_prefix = old_record_prefix 284 self.record('END GOOD', None, name) 285 except: 286 self.record_prefix = old_record_prefix 287 self.record('END FAIL', None, name, format_error()) 288 # We don't want to raise up an error higher if it's just 289 # a TestError - we want to carry on to other tests. Hence 290 # this outer try/except block. 291 except TestError: 292 pass 293 except: 294 raise TestError(name + ' failed\n' + format_error()) 295 296 return result 297 298 299 # Check the passed kernel identifier against the command line 300 # and the running kernel, abort the job on missmatch. 301 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 302 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 303 % (expected_when, expected_id, expected_cl, type) 304 305 running_id = running_os_ident() 306 307 cmdline = read_one_line("/proc/cmdline") 308 309 find_sum = re.compile(r'.*IDENT=(\d+)') 310 m = find_sum.match(cmdline) 311 cmdline_when = -1 312 if m: 313 cmdline_when = int(m.groups()[0]) 314 315 cl_re = re.compile(r'\d{7,}') 316 cl_match = cl_re.search(system_output('uname -v').split()[1]) 317 if cl_match: 318 current_cl = cl_match.group() 319 else: 320 current_cl = None 321 322 # We have all the facts, see if they indicate we 323 # booted the requested kernel or not. 324 bad = False 325 if (type == 'src' and expected_id != running_id or 326 type == 'rpm' and not running_id.startswith(expected_id + '::')): 327 print "check_kernel_ident: kernel identifier mismatch" 328 bad = True 329 if expected_when != cmdline_when: 330 print "check_kernel_ident: kernel command line mismatch" 331 bad = True 332 if expected_cl and current_cl and str(expected_cl) != current_cl: 333 print 'check_kernel_ident: kernel changelist mismatch' 334 bad = True 335 336 if bad: 337 print " Expected Ident: " + expected_id 338 print " Running Ident: " + running_id 339 print " Expected Mark: %d" % (expected_when) 340 print "Command Line Mark: %d" % (cmdline_when) 341 print " Expected P4 CL: %s" % expected_cl 342 print " P4 CL: %s" % current_cl 343 print " Command Line: " + cmdline 344 345 raise JobError("boot failure", "reboot.verify") 346 347 self.record('GOOD', subdir, 'reboot.verify') 348 349 350 def filesystem(self, device, mountpoint = None, loop_size = 0): 351 if not mountpoint: 352 mountpoint = self.tmpdir 353 return filesystem.filesystem(self, device, mountpoint,loop_size) 354 355 356 def reboot(self, tag='autotest'): 357 self.record('GOOD', None, 'reboot.start') 358 self.harness.run_reboot() 359 default = self.config_get('boot.set_default') 360 if default: 361 self.bootloader.set_default(tag) 362 else: 363 self.bootloader.boot_once(tag) 364 system("(sleep 5; reboot) &") 365 self.quit() 366 367 368 def noop(self, text): 369 print "job: noop: " + text 370 371 372 # Job control primatives. 373 374 def __parallel_execute(self, func, *args): 375 func(*args) 376 377 378 def parallel(self, *tasklist): 379 """Run tasks in parallel""" 380 381 pids = [] 382 for task in tasklist: 383 pids.append(fork_start(self.resultdir, 384 lambda: self.__parallel_execute(*task))) 385 for pid in pids: 386 fork_waitfor(self.resultdir, pid) 387 388 389 def quit(self): 390 # XXX: should have a better name. 391 self.harness.run_pause() 392 raise JobContinue("more to come") 393 394 395 def complete(self, status): 396 """Clean up and exit""" 397 # We are about to exit 'complete' so clean up the control file. 398 try: 399 os.unlink(self.control + '.state') 400 except: 401 pass 402 self.harness.run_complete() 403 sys.exit(status) 404 405 406 steps = [] 407 def next_step(self, step): 408 """Define the next step""" 409 if not isinstance(step[0], basestring): 410 step[0] = step[0].__name__ 411 self.steps.append(step) 412 pickle.dump(self.steps, open(self.control + '.state', 'w')) 413 414 415 def next_step_prepend(self, step): 416 """Insert a new step, executing first""" 417 if not isinstance(step[0], basestring): 418 step[0] = step[0].__name__ 419 self.steps.insert(0, step) 420 pickle.dump(self.steps, open(self.control + '.state', 'w')) 421 422 423 def step_engine(self): 424 """the stepping engine -- if the control file defines 425 step_init we will be using this engine to drive multiple runs. 426 """ 427 """Do the next step""" 428 lcl = dict({'job': self}) 429 430 str = """ 431from error import * 432from autotest_utils import * 433""" 434 exec(str, lcl, lcl) 435 execfile(self.control, lcl, lcl) 436 437 state = self.control + '.state' 438 # If there is a mid-job state file load that in and continue 439 # where it indicates. Otherwise start stepping at the passed 440 # entry. 441 try: 442 self.steps = pickle.load(open(state, 'r')) 443 except: 444 if lcl.has_key('step_init'): 445 self.next_step([lcl['step_init']]) 446 447 # Run the step list. 448 while len(self.steps) > 0: 449 step = self.steps.pop(0) 450 pickle.dump(self.steps, open(state, 'w')) 451 452 cmd = step.pop(0) 453 lcl['__args'] = step 454 exec(cmd + "(*__args)", lcl, lcl) 455 456 457 def record(self, status_code, subdir, operation, status = ''): 458 """ 459 Record job-level status 460 461 The intent is to make this file both machine parseable and 462 human readable. That involves a little more complexity, but 463 really isn't all that bad ;-) 464 465 Format is <status code>\t<subdir>\t<operation>\t<status> 466 467 status code: (GOOD|WARN|FAIL|ABORT) 468 or START 469 or END (GOOD|WARN|FAIL|ABORT) 470 471 subdir: MUST be a relevant subdirectory in the results, 472 or None, which will be represented as '----' 473 474 operation: description of what you ran (e.g. "dbench", or 475 "mkfs -t foobar /dev/sda9") 476 477 status: error message or "completed sucessfully" 478 479 ------------------------------------------------------------ 480 481 Initial tabs indicate indent levels for grouping, and is 482 governed by self.record_prefix 483 484 multiline messages have secondary lines prefaced by a double 485 space (' ') 486 """ 487 488 if subdir: 489 if re.match(r'[\n\t]', subdir): 490 raise "Invalid character in subdir string" 491 substr = subdir 492 else: 493 substr = '----' 494 495 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 496 status_code): 497 raise "Invalid status code supplied: %s" % status_code 498 if not operation: 499 operation = '----' 500 if re.match(r'[\n\t]', operation): 501 raise "Invalid character in operation string" 502 operation = operation.rstrip() 503 status = status.rstrip() 504 status = re.sub(r"\t", " ", status) 505 # Ensure any continuation lines are marked so we can 506 # detect them in the status file to ensure it is parsable. 507 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 508 509 # Generate timestamps for inclusion in the logs 510 epoch_time = int(time.time()) # seconds since epoch, in UTC 511 local_time = time.localtime(epoch_time) 512 epoch_time_str = "timestamp=%d" % (epoch_time,) 513 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 514 local_time) 515 516 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 517 epoch_time_str, local_time_str, 518 status)) 519 520 self.harness.test_status_detail(status_code, substr, 521 operation, status) 522 self.harness.test_status(msg) 523 print msg 524 status_file = os.path.join(self.resultdir, 'status') 525 open(status_file, "a").write(self.record_prefix + msg + "\n") 526 if subdir: 527 status_file = os.path.join(self.resultdir, subdir, 'status') 528 open(status_file, "a").write(msg + "\n") 529 530 531def runjob(control, cont = False, tag = "default", harness_type = ''): 532 """The main interface to this module 533 534 control 535 The control file to use for this job. 536 cont 537 Whether this is the continuation of a previously started job 538 """ 539 control = os.path.abspath(control) 540 state = control + '.state' 541 542 # instantiate the job object ready for the control file. 543 myjob = None 544 try: 545 # Check that the control file is valid 546 if not os.path.exists(control): 547 raise JobError(control + ": control file not found") 548 549 # When continuing, the job is complete when there is no 550 # state file, ensure we don't try and continue. 551 if cont and not os.path.exists(state): 552 sys.exit(1) 553 if cont == False and os.path.exists(state): 554 os.unlink(state) 555 556 myjob = job(control, tag, cont, harness_type) 557 558 # Load in the users control file, may do any one of: 559 # 1) execute in toto 560 # 2) define steps, and select the first via next_step() 561 myjob.step_engine() 562 563 except JobContinue: 564 sys.exit(5) 565 566 except JobError, instance: 567 print "JOB ERROR: " + instance.args[0] 568 if myjob: 569 command = None 570 if len(instance.args) > 1: 571 command = instance.args[1] 572 myjob.record('ABORT', None, command, instance.args[0]) 573 myjob.complete(1) 574 575 576 except: 577 print "JOB ERROR: " + format_error() 578 if myjob: 579 myjob.record('ABORT', None, None, format_error()) 580 myjob.complete(1) 581 582 # If we get here, then we assume the job is complete and good. 583 myjob.record('GOOD', None, None, 'job completed sucessfully') 584 myjob.complete(0) 585 586 587