job.py revision c82f24696c6acb3efe618ce63c7de73185af0e14
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 14import harness, config 15 16class job: 17 """The actual job against which we do everything. 18 19 Properties: 20 autodir 21 The top level autotest directory (/usr/local/autotest). 22 Comes from os.environ['AUTODIR']. 23 bindir 24 <autodir>/bin/ 25 testdir 26 <autodir>/tests/ 27 profdir 28 <autodir>/profilers/ 29 tmpdir 30 <autodir>/tmp/ 31 resultdir 32 <autodir>/results/<jobtag> 33 stdout 34 fd_stack object for stdout 35 stderr 36 fd_stack object for stderr 37 profilers 38 the profilers object for this job 39 harness 40 the server harness object for this job 41 config 42 the job configuration for this job 43 """ 44 45 def __init__(self, control, jobtag, cont, harness_type=None): 46 """ 47 control 48 The control file (pathname of) 49 jobtag 50 The job tag string (eg "default") 51 cont 52 If this is the continuation of this job 53 harness_type 54 An alternative server harness 55 """ 56 self.autodir = os.environ['AUTODIR'] 57 self.bindir = os.path.join(self.autodir, 'bin') 58 self.testdir = os.path.join(self.autodir, 'tests') 59 self.profdir = os.path.join(self.autodir, 'profilers') 60 self.tmpdir = os.path.join(self.autodir, 'tmp') 61 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 62 63 if not cont: 64 if os.path.exists(self.tmpdir): 65 system('umount -f %s > /dev/null 2> /dev/null'%\ 66 self.tmpdir, ignorestatus=True) 67 system('rm -rf ' + self.tmpdir) 68 os.mkdir(self.tmpdir) 69 70 results = os.path.join(self.autodir, 'results') 71 if not os.path.exists(results): 72 os.mkdir(results) 73 74 download = os.path.join(self.testdir, 'download') 75 if os.path.exists(download): 76 system('rm -rf ' + download) 77 os.mkdir(download) 78 79 if os.path.exists(self.resultdir): 80 system('rm -rf ' + self.resultdir) 81 os.mkdir(self.resultdir) 82 83 os.mkdir(os.path.join(self.resultdir, 'debug')) 84 os.mkdir(os.path.join(self.resultdir, 'analysis')) 85 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 86 87 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 88 89 self.control = control 90 self.jobtab = jobtag 91 92 self.stdout = fd_stack.fd_stack(1, sys.stdout) 93 self.stderr = fd_stack.fd_stack(2, sys.stderr) 94 self.record_prefix = '' 95 96 self.config = config.config(self) 97 98 self.harness = harness.select(harness_type, self) 99 100 self.profilers = profilers.profilers(self) 101 102 try: 103 tool = self.config_get('boottool.executable') 104 self.bootloader = boottool.boottool(tool) 105 except: 106 pass 107 108 pwd = os.getcwd() 109 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 110 system(os.path.join(self.bindir, 'sysinfo.py')) 111 system('dmesg -c > dmesg', ignorestatus=1) 112 os.chdir(pwd) 113 114 self.harness.run_start() 115 116 117 def relative_path(self, path): 118 """\ 119 Return a patch relative to the job results directory 120 """ 121 head = len(self.resultdir) + 1 # remove the / inbetween 122 return path[head:] 123 124 125 def control_get(self): 126 return self.control 127 128 129 def harness_select(self, which): 130 self.harness = harness.select(which, self) 131 132 133 def config_set(self, name, value): 134 self.config.set(name, value) 135 136 137 def config_get(self, name): 138 return self.config.get(name) 139 140 def setup_dirs(self, results_dir, tmp_dir): 141 if not tmp_dir: 142 tmp_dir = os.path.join(self.tmpdir, 'build') 143 if not os.path.exists(tmp_dir): 144 os.mkdir(tmp_dir) 145 if not os.path.isdir(tmp_dir): 146 raise "Temp dir (%s) is not a dir - args backwards?" \ 147 % self.tmpdir 148 149 # We label the first build "build" and then subsequent ones 150 # as "build.2", "build.3", etc. Whilst this is a little bit 151 # inconsistent, 99.9% of jobs will only have one build 152 # (that's not done as kernbench, sparse, or buildtest), 153 # so it works out much cleaner. One of life's comprimises. 154 if not results_dir: 155 results_dir = os.path.join(self.resultdir, 'build') 156 i = 2 157 while os.path.exists(results_dir): 158 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 159 i += 1 160 if not os.path.exists(results_dir): 161 os.mkdir(results_dir) 162 163 return (results_dir, tmp_dir) 164 165 166 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 167 kjob = None ): 168 """Summon a xen object""" 169 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 170 build_dir = 'xen' 171 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 172 173 174 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 175 """Summon a kernel object""" 176 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 177 build_dir = 'linux' 178 return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave) 179 180 181 def barrier(self, *args): 182 """Create a barrier object""" 183 return barrier.barrier(*args) 184 185 186 def setup_dep(self, deps): 187 """Set up the dependencies for this test. 188 189 deps is a list of libraries required for this test. 190 """ 191 for dep in deps: 192 try: 193 os.chdir(os.path.join(self.autodir, 'deps', dep)) 194 system('./' + dep + '.py') 195 except: 196 error = "setting up dependency " + dep + "\n" 197 raise UnhandledError(error) 198 199 200 def __runtest(self, url, tag, args, dargs): 201 try: 202 test.runtest(self, url, tag, args, dargs) 203 except AutotestError: 204 raise 205 except: 206 raise UnhandledError('running test ' + \ 207 self.__class__.__name__ + "\n") 208 209 210 def runtest(self, tag, url, *args): 211 raise "Deprecated call to job.runtest. Use run_test instead" 212 213 214 def run_test(self, url, *args, **dargs): 215 """Summon a test object and run it. 216 217 tag 218 tag to add to testname 219 url 220 url of the test to run 221 """ 222 223 if not url: 224 raise "Test name is invalid. Switched arguments?" 225 (group, testname) = test.testname(url) 226 tag = None 227 subdir = testname 228 if dargs.has_key('tag'): 229 tag = dargs['tag'] 230 del dargs['tag'] 231 if tag: 232 subdir += '.' + tag 233 234 reraise = False 235 if dargs.has_key('reraise'): 236 reraise = dargs['reraise'] 237 del dargs['reraise'] 238 239 try: 240 try: 241 self.__runtest(url, tag, args, dargs) 242 except Exception, detail: 243 self.record('FAIL', subdir, testname, \ 244 detail.__str__()) 245 246 raise 247 else: 248 self.record('GOOD', subdir, testname, \ 249 'completed successfully') 250 except TestError: 251 if reraise: 252 raise 253 else: 254 return 0 255 except: 256 raise 257 else: 258 return 1 259 260 261 def run_group(self, function, *args, **dargs): 262 """\ 263 function: 264 subroutine to run 265 *args: 266 arguments for the function 267 """ 268 269 result = None 270 name = function.__name__ 271 272 # Allow the tag for the group to be specified. 273 if dargs.has_key('tag'): 274 tag = dargs['tag'] 275 del dargs['tag'] 276 if tag: 277 name = tag 278 279 # if tag: 280 # name += '.' + tag 281 old_record_prefix = self.record_prefix 282 try: 283 try: 284 self.record('START', None, name) 285 self.record_prefix += '\t' 286 result = function(*args, **dargs) 287 self.record_prefix = old_record_prefix 288 self.record('END GOOD', None, name) 289 except: 290 self.record_prefix = old_record_prefix 291 self.record('END FAIL', None, name, format_error()) 292 # We don't want to raise up an error higher if it's just 293 # a TestError - we want to carry on to other tests. Hence 294 # this outer try/except block. 295 except TestError: 296 pass 297 except: 298 raise TestError(name + ' failed\n' + format_error()) 299 300 return result 301 302 303 # Check the passed kernel identifier against the command line 304 # and the running kernel, abort the job on missmatch. 305 def kernel_check_ident(self, expected_when, expected_id, subdir): 306 print "POST BOOT: checking booted kernel mark=%d identity='%s'" \ 307 % (expected_when, expected_id) 308 309 running_id = running_os_ident() 310 311 cmdline = read_one_line("/proc/cmdline") 312 313 find_sum = re.compile(r'.*IDENT=(\d+)') 314 m = find_sum.match(cmdline) 315 cmdline_when = -1 316 if m: 317 cmdline_when = int(m.groups()[0]) 318 319 # We have all the facts, see if they indicate we 320 # booted the requested kernel or not. 321 bad = False 322 if expected_id != running_id: 323 print "check_kernel_ident: kernel identifier mismatch" 324 bad = True 325 if expected_when != cmdline_when: 326 print "check_kernel_ident: kernel command line mismatch" 327 bad = True 328 329 if bad: 330 print " Expected Ident: " + expected_id 331 print " Running Ident: " + running_id 332 print " Expected Mark: %d" % (expected_when) 333 print "Command Line Mark: %d" % (cmdline_when) 334 print " Command Line: " + cmdline 335 336 raise JobError("boot failure") 337 338 self.record('GOOD', subdir, 'boot', 'boot successful') 339 340 341 def filesystem(self, device, mountpoint = None, loop_size = 0): 342 if not mountpoint: 343 mountpoint = self.tmpdir 344 return filesystem.filesystem(self, device, mountpoint,loop_size) 345 346 347 def reboot(self, tag='autotest'): 348 self.harness.run_reboot() 349 self.bootloader.boot_once(tag) 350 system("reboot") 351 self.quit() 352 353 354 def noop(self, text): 355 print "job: noop: " + text 356 357 358 # Job control primatives. 359 360 def __parallel_execute(self, func, *args): 361 func(*args) 362 363 364 def parallel(self, *tasklist): 365 """Run tasks in parallel""" 366 367 pids = [] 368 for task in tasklist: 369 pids.append(fork_start(self.resultdir, 370 lambda: self.__parallel_execute(*task))) 371 for pid in pids: 372 fork_waitfor(self.resultdir, pid) 373 374 375 def quit(self): 376 # XXX: should have a better name. 377 self.harness.run_pause() 378 raise JobContinue("more to come") 379 380 381 def complete(self, status): 382 """Clean up and exit""" 383 # We are about to exit 'complete' so clean up the control file. 384 try: 385 os.unlink(self.control + '.state') 386 except: 387 pass 388 self.harness.run_complete() 389 sys.exit(status) 390 391 392 steps = [] 393 def next_step(self, step): 394 """Define the next step""" 395 if not isinstance(step[0], basestring): 396 step[0] = step[0].__name__ 397 self.steps.append(step) 398 pickle.dump(self.steps, open(self.control + '.state', 'w')) 399 400 401 def next_step_prepend(self, step): 402 """Insert a new step, executing first""" 403 if not isinstance(step[0], basestring): 404 step[0] = step[0].__name__ 405 self.steps.insert(0, step) 406 pickle.dump(self.steps, open(self.control + '.state', 'w')) 407 408 409 def step_engine(self): 410 """the stepping engine -- if the control file defines 411 step_init we will be using this engine to drive multiple runs. 412 """ 413 """Do the next step""" 414 lcl = dict({'job': self}) 415 416 str = """ 417from error import * 418from autotest_utils import * 419""" 420 exec(str, lcl, lcl) 421 execfile(self.control, lcl, lcl) 422 423 state = self.control + '.state' 424 # If there is a mid-job state file load that in and continue 425 # where it indicates. Otherwise start stepping at the passed 426 # entry. 427 try: 428 self.steps = pickle.load(open(state, 'r')) 429 except: 430 if lcl.has_key('step_init'): 431 self.next_step([lcl['step_init']]) 432 433 # Run the step list. 434 while len(self.steps) > 0: 435 step = self.steps.pop(0) 436 pickle.dump(self.steps, open(state, 'w')) 437 438 cmd = step.pop(0) 439 lcl['__args'] = step 440 exec(cmd + "(*__args)", lcl, lcl) 441 442 443 def record(self, status_code, subdir, operation, status = ''): 444 """ 445 Record job-level status 446 447 The intent is to make this file both machine parseable and 448 human readable. That involves a little more complexity, but 449 really isn't all that bad ;-) 450 451 Format is <status code>\t<subdir>\t<operation>\t<status> 452 453 status code: (GOOD|WARN|FAIL|ABORT) 454 or START 455 or END (GOOD|WARN|FAIL|ABORT) 456 457 subdir: MUST be a relevant subdirectory in the results, 458 or None, which will be represented as '----' 459 460 operation: description of what you ran (e.g. "dbench", or 461 "mkfs -t foobar /dev/sda9") 462 463 status: error message or "completed sucessfully" 464 465 ------------------------------------------------------------ 466 467 Initial tabs indicate indent levels for grouping, and is 468 governed by self.record_prefix 469 470 multiline messages have secondary lines prefaced by a double 471 space (' ') 472 """ 473 474 if subdir: 475 if re.match(r'[\n\t]', subdir): 476 raise "Invalid character in subdir string" 477 substr = subdir 478 else: 479 substr = '----' 480 481 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 482 status_code): 483 raise "Invalid status code supplied: %s" % status_code 484 if re.match(r'[\n\t]', operation): 485 raise "Invalid character in operation string" 486 operation = operation.rstrip() 487 status = status.rstrip() 488 status = re.sub(r"\t", " ", status) 489 # Ensure any continuation lines are marked so we can 490 # detect them in the status file to ensure it is parsable. 491 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 492 493 msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status) 494 495 self.harness.test_status_detail(status_code, substr, 496 operation, status) 497 self.harness.test_status(msg) 498 print msg 499 status_file = os.path.join(self.resultdir, 'status') 500 open(status_file, "a").write(self.record_prefix + msg + "\n") 501 if subdir: 502 status_file = os.path.join(self.resultdir, subdir, 'status') 503 open(status_file, "a").write(msg + "\n") 504 505 506def runjob(control, cont = False, tag = "default", harness_type = ''): 507 """The main interface to this module 508 509 control 510 The control file to use for this job. 511 cont 512 Whether this is the continuation of a previously started job 513 """ 514 control = os.path.abspath(control) 515 state = control + '.state' 516 517 # instantiate the job object ready for the control file. 518 myjob = None 519 try: 520 # Check that the control file is valid 521 if not os.path.exists(control): 522 raise JobError(control + ": control file not found") 523 524 # When continuing, the job is complete when there is no 525 # state file, ensure we don't try and continue. 526 if cont and not os.path.exists(state): 527 sys.exit(1) 528 if cont == False and os.path.exists(state): 529 os.unlink(state) 530 531 myjob = job(control, tag, cont, harness_type) 532 533 # Load in the users control file, may do any one of: 534 # 1) execute in toto 535 # 2) define steps, and select the first via next_step() 536 myjob.step_engine() 537 538 except JobContinue: 539 sys.exit(5) 540 541 except JobError, instance: 542 print "JOB ERROR: " + instance.args[0] 543 if myjob != None: 544 myjob.record('ABORT', None, instance.args[0]) 545 myjob.complete(1) 546 547 except: 548 if myjob: 549 myjob.harness.run_abort() 550 # Ensure we cannot continue this job, it is in rictus. 551 if os.path.exists(state): 552 os.unlink(state) 553 raise 554 555 # If we get here, then we assume the job is complete and good. 556 myjob.complete(0) 557 558