job.py revision 08403caa1b9fd69b2ccb79b0c9fcb6e7e0b1913c
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 14import harness, config 15 16class job: 17 """The actual job against which we do everything. 18 19 Properties: 20 autodir 21 The top level autotest directory (/usr/local/autotest). 22 Comes from os.environ['AUTODIR']. 23 bindir 24 <autodir>/bin/ 25 testdir 26 <autodir>/tests/ 27 profdir 28 <autodir>/profilers/ 29 tmpdir 30 <autodir>/tmp/ 31 resultdir 32 <autodir>/results/<jobtag> 33 stdout 34 fd_stack object for stdout 35 stderr 36 fd_stack object for stderr 37 profilers 38 the profilers object for this job 39 harness 40 the server harness object for this job 41 config 42 the job configuration for this job 43 """ 44 45 def __init__(self, control, jobtag, cont, harness_type=None): 46 """ 47 control 48 The control file (pathname of) 49 jobtag 50 The job tag string (eg "default") 51 cont 52 If this is the continuation of this job 53 harness_type 54 An alternative server harness 55 """ 56 self.autodir = os.environ['AUTODIR'] 57 self.bindir = os.path.join(self.autodir, 'bin') 58 self.testdir = os.path.join(self.autodir, 'tests') 59 self.profdir = os.path.join(self.autodir, 'profilers') 60 self.tmpdir = os.path.join(self.autodir, 'tmp') 61 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 62 63 if not cont: 64 if os.path.exists(self.tmpdir): 65 system('umount -f %s > /dev/null 2> /dev/null'%\ 66 self.tmpdir, ignorestatus=True) 67 system('rm -rf ' + self.tmpdir) 68 os.mkdir(self.tmpdir) 69 70 results = os.path.join(self.autodir, 'results') 71 if not os.path.exists(results): 72 os.mkdir(results) 73 74 download = os.path.join(self.testdir, 'download') 75 if os.path.exists(download): 76 system('rm -rf ' + download) 77 os.mkdir(download) 78 79 if os.path.exists(self.resultdir): 80 system('rm -rf ' + self.resultdir) 81 os.mkdir(self.resultdir) 82 83 os.mkdir(os.path.join(self.resultdir, 'debug')) 84 os.mkdir(os.path.join(self.resultdir, 'analysis')) 85 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 86 87 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 88 89 self.control = control 90 self.jobtab = jobtag 91 92 self.stdout = fd_stack.fd_stack(1, sys.stdout) 93 self.stderr = fd_stack.fd_stack(2, sys.stderr) 94 self.record_prefix = '' 95 96 self.config = config.config(self) 97 98 self.harness = harness.select(harness_type, self) 99 100 self.profilers = profilers.profilers(self) 101 102 try: 103 tool = self.config_get('boottool.executable') 104 self.bootloader = boottool.boottool(tool) 105 except: 106 pass 107 108 pwd = os.getcwd() 109 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 110 system(os.path.join(self.bindir, 'sysinfo.py')) 111 system('dmesg -c > dmesg', ignorestatus=1) 112 os.chdir(pwd) 113 114 self.harness.run_start() 115 116 117 def relative_path(self, path): 118 """\ 119 Return a patch relative to the job results directory 120 """ 121 head = len(self.resultdir) + 1 # remove the / inbetween 122 return path[head:] 123 124 125 def control_get(self): 126 return self.control 127 128 129 def harness_select(self, which): 130 self.harness = harness.select(which, self) 131 132 133 def config_set(self, name, value): 134 self.config.set(name, value) 135 136 137 def config_get(self, name): 138 return self.config.get(name) 139 140 def setup_dirs(self, results_dir, tmp_dir): 141 if not tmp_dir: 142 tmp_dir = os.path.join(self.tmpdir, 'build') 143 if not os.path.exists(tmp_dir): 144 os.mkdir(tmp_dir) 145 if not os.path.isdir(tmp_dir): 146 raise "Temp dir (%s) is not a dir - args backwards?" \ 147 % self.tmpdir 148 149 # We label the first build "build" and then subsequent ones 150 # as "build.2", "build.3", etc. Whilst this is a little bit 151 # inconsistent, 99.9% of jobs will only have one build 152 # (that's not done as kernbench, sparse, or buildtest), 153 # so it works out much cleaner. One of life's comprimises. 154 if not results_dir: 155 results_dir = os.path.join(self.resultdir, 'build') 156 i = 2 157 while os.path.exists(results_dir): 158 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 159 i += 1 160 if not os.path.exists(results_dir): 161 os.mkdir(results_dir) 162 163 return (results_dir, tmp_dir) 164 165 166 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 167 kjob = None ): 168 """Summon a xen object""" 169 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 170 build_dir = 'xen' 171 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 172 173 174 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 175 """Summon a kernel object""" 176 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 177 build_dir = 'linux' 178 return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave) 179 180 181 def barrier(self, *args): 182 """Create a barrier object""" 183 return barrier.barrier(*args) 184 185 186 def setup_dep(self, deps): 187 """Set up the dependencies for this test. 188 189 deps is a list of libraries required for this test. 190 """ 191 for dep in deps: 192 try: 193 os.chdir(os.path.join(self.autodir, 'deps', dep)) 194 system('./' + dep + '.py') 195 except: 196 error = "setting up dependency " + dep + "\n" 197 raise UnhandledError(error) 198 199 200 def __runtest(self, url, tag, args, dargs): 201 try: 202 test.runtest(self, url, tag, args, dargs) 203 except AutotestError: 204 raise 205 except: 206 raise UnhandledError('running test ' + \ 207 self.__class__.__name__ + "\n") 208 209 210 def runtest(self, tag, url, *args): 211 raise "Deprecated call to job.runtest. Use run_test instead" 212 213 214 def run_test(self, url, *args, **dargs): 215 """Summon a test object and run it. 216 217 tag 218 tag to add to testname 219 url 220 url of the test to run 221 """ 222 223 if not url: 224 raise "Test name is invalid. Switched arguments?" 225 (group, testname) = test.testname(url) 226 tag = None 227 subdir = testname 228 if dargs.has_key('tag'): 229 tag = dargs['tag'] 230 del dargs['tag'] 231 if tag: 232 subdir += '.' + tag 233 try: 234 try: 235 self.__runtest(url, tag, args, dargs) 236 except Exception, detail: 237 self.record('FAIL', subdir, testname, \ 238 detail.__str__()) 239 240 raise 241 else: 242 self.record('GOOD', subdir, testname, \ 243 'completed successfully') 244 except TestError: 245 return 0 246 except: 247 raise 248 else: 249 return 1 250 251 252 def run_group(self, function, *args): 253 """\ 254 function: 255 subroutine to run 256 *args: 257 arguments for the function 258 """ 259 260 result = None 261 name = function.__name__ 262 # if tag: 263 # name += '.' + tag 264 old_record_prefix = self.record_prefix 265 try: 266 try: 267 self.record('START', None, name) 268 self.record_prefix += '\t' 269 result = function(*args) 270 self.record_prefix = old_record_prefix 271 self.record('END GOOD', None, name) 272 except: 273 self.record_prefix = old_record_prefix 274 self.record('END FAIL', None, name, format_error()) 275 # We don't want to raise up an error higher if it's just 276 # a TestError - we want to carry on to other tests. Hence 277 # this outer try/except block. 278 except TestError: 279 pass 280 except: 281 raise TestError(name + ' failed\n' + format_error()) 282 283 return result 284 285 286 # Check the passed kernel identifier against the command line 287 # and the running kernel, abort the job on missmatch. 288 def kernel_check_ident(self, expected_when, expected_id, subdir): 289 print "POST BOOT: checking booted kernel mark=%d identity='%s'" \ 290 % (expected_when, expected_id) 291 292 running_id = running_os_ident() 293 294 cmdline = read_one_line("/proc/cmdline") 295 296 find_sum = re.compile(r'.*IDENT=(\d+)') 297 m = find_sum.match(cmdline) 298 cmdline_when = -1 299 if m: 300 cmdline_when = int(m.groups()[0]) 301 302 # We have all the facts, see if they indicate we 303 # booted the requested kernel or not. 304 bad = False 305 if expected_id != running_id: 306 print "check_kernel_ident: kernel identifier mismatch" 307 bad = True 308 if expected_when != cmdline_when: 309 print "check_kernel_ident: kernel command line mismatch" 310 bad = True 311 312 if bad: 313 print " Expected Ident: " + expected_id 314 print " Running Ident: " + running_id 315 print " Expected Mark: %d" % (expected_when) 316 print "Command Line Mark: %d" % (cmdline_when) 317 print " Command Line: " + cmdline 318 319 raise JobError("boot failure") 320 321 self.record('GOOD', subdir, 'boot', 'boot successful') 322 323 324 def filesystem(self, device, mountpoint = None, loop_size = 0): 325 if not mountpoint: 326 mountpoint = self.tmpdir 327 return filesystem.filesystem(self, device, mountpoint,loop_size) 328 329 330 def reboot(self, tag='autotest'): 331 self.harness.run_reboot() 332 self.bootloader.boot_once(tag) 333 system("reboot") 334 self.quit() 335 336 337 def noop(self, text): 338 print "job: noop: " + text 339 340 341 # Job control primatives. 342 343 def __parallel_execute(self, func, *args): 344 func(*args) 345 346 347 def parallel(self, *tasklist): 348 """Run tasks in parallel""" 349 350 pids = [] 351 for task in tasklist: 352 pids.append(fork_start(self.resultdir, 353 lambda: self.__parallel_execute(*task))) 354 for pid in pids: 355 fork_waitfor(self.resultdir, pid) 356 357 358 def quit(self): 359 # XXX: should have a better name. 360 self.harness.run_pause() 361 raise JobContinue("more to come") 362 363 364 def complete(self, status): 365 """Clean up and exit""" 366 # We are about to exit 'complete' so clean up the control file. 367 try: 368 os.unlink(self.control + '.state') 369 except: 370 pass 371 self.harness.run_complete() 372 sys.exit(status) 373 374 375 steps = [] 376 def next_step(self, step): 377 """Define the next step""" 378 if not isinstance(step[0], basestring): 379 step[0] = step[0].__name__ 380 self.steps.append(step) 381 pickle.dump(self.steps, open(self.control + '.state', 'w')) 382 383 384 def next_step_prepend(self, step): 385 """Insert a new step, executing first""" 386 if not isinstance(step[0], basestring): 387 step[0] = step[0].__name__ 388 self.steps.insert(0, step) 389 pickle.dump(self.steps, open(self.control + '.state', 'w')) 390 391 392 def step_engine(self): 393 """the stepping engine -- if the control file defines 394 step_init we will be using this engine to drive multiple runs. 395 """ 396 """Do the next step""" 397 lcl = dict({'job': self}) 398 399 str = """ 400from error import * 401from autotest_utils import * 402""" 403 exec(str, lcl, lcl) 404 execfile(self.control, lcl, lcl) 405 406 state = self.control + '.state' 407 # If there is a mid-job state file load that in and continue 408 # where it indicates. Otherwise start stepping at the passed 409 # entry. 410 try: 411 self.steps = pickle.load(open(state, 'r')) 412 except: 413 if lcl.has_key('step_init'): 414 self.next_step([lcl['step_init']]) 415 416 # Run the step list. 417 while len(self.steps) > 0: 418 step = self.steps.pop(0) 419 pickle.dump(self.steps, open(state, 'w')) 420 421 cmd = step.pop(0) 422 lcl['__args'] = step 423 exec(cmd + "(*__args)", lcl, lcl) 424 425 426 def record(self, status_code, subdir, operation, status = ''): 427 """ 428 Record job-level status 429 430 The intent is to make this file both machine parseable and 431 human readable. That involves a little more complexity, but 432 really isn't all that bad ;-) 433 434 Format is <status code>\t<subdir>\t<operation>\t<status> 435 436 status code: (GOOD|WARN|FAIL|ABORT) 437 or START 438 or END (GOOD|WARN|FAIL|ABORT) 439 440 subdir: MUST be a relevant subdirectory in the results, 441 or None, which will be represented as '----' 442 443 operation: description of what you ran (e.g. "dbench", or 444 "mkfs -t foobar /dev/sda9") 445 446 status: error message or "completed sucessfully" 447 448 ------------------------------------------------------------ 449 450 Initial tabs indicate indent levels for grouping, and is 451 governed by self.record_prefix 452 453 multiline messages have secondary lines prefaced by a double 454 space (' ') 455 """ 456 457 if subdir: 458 if re.match(r'[\n\t]', subdir): 459 raise "Invalid character in subdir string" 460 substr = subdir 461 else: 462 substr = '----' 463 464 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 465 status_code): 466 raise "Invalid status code supplied: %s" % status_code 467 if re.match(r'[\n\t]', operation): 468 raise "Invalid character in operation string" 469 operation = operation.rstrip() 470 status = status.rstrip() 471 status = re.sub(r"\t", " ", status) 472 # Ensure any continuation lines are marked so we can 473 # detect them in the status file to ensure it is parsable. 474 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 475 476 msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status) 477 478 self.harness.test_status_detail(status_code, substr, 479 operation, status) 480 self.harness.test_status(msg) 481 print msg 482 status_file = os.path.join(self.resultdir, 'status') 483 open(status_file, "a").write(self.record_prefix + msg + "\n") 484 if subdir: 485 status_file = os.path.join(self.resultdir, subdir, 'status') 486 open(status_file, "a").write(msg + "\n") 487 488 489def runjob(control, cont = False, tag = "default", harness_type = ''): 490 """The main interface to this module 491 492 control 493 The control file to use for this job. 494 cont 495 Whether this is the continuation of a previously started job 496 """ 497 control = os.path.abspath(control) 498 state = control + '.state' 499 500 # instantiate the job object ready for the control file. 501 myjob = None 502 try: 503 # Check that the control file is valid 504 if not os.path.exists(control): 505 raise JobError(control + ": control file not found") 506 507 # When continuing, the job is complete when there is no 508 # state file, ensure we don't try and continue. 509 if cont and not os.path.exists(state): 510 sys.exit(1) 511 if cont == False and os.path.exists(state): 512 os.unlink(state) 513 514 myjob = job(control, tag, cont, harness_type) 515 516 # Load in the users control file, may do any one of: 517 # 1) execute in toto 518 # 2) define steps, and select the first via next_step() 519 myjob.step_engine() 520 521 except JobContinue: 522 sys.exit(5) 523 524 except JobError, instance: 525 print "JOB ERROR: " + instance.args[0] 526 if myjob != None: 527 myjob.record('ABORT', None, instance.args[0]) 528 myjob.complete(1) 529 530 except: 531 if myjob: 532 myjob.harness.run_abort() 533 # Ensure we cannot continue this job, it is in rictus. 534 if os.path.exists(state): 535 os.unlink(state) 536 raise 537 538 # If we get here, then we assume the job is complete and good. 539 myjob.complete(0) 540 541