job.py revision 736adc930ee15e5bade91e2cf2c996ccf8bbd3eb
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 14import harness, config 15 16class job: 17 """The actual job against which we do everything. 18 19 Properties: 20 autodir 21 The top level autotest directory (/usr/local/autotest). 22 Comes from os.environ['AUTODIR']. 23 bindir 24 <autodir>/bin/ 25 testdir 26 <autodir>/tests/ 27 profdir 28 <autodir>/profilers/ 29 tmpdir 30 <autodir>/tmp/ 31 resultdir 32 <autodir>/results/<jobtag> 33 stdout 34 fd_stack object for stdout 35 stderr 36 fd_stack object for stderr 37 profilers 38 the profilers object for this job 39 harness 40 the server harness object for this job 41 config 42 the job configuration for this job 43 """ 44 45 def __init__(self, control, jobtag, cont, harness_type=None): 46 """ 47 control 48 The control file (pathname of) 49 jobtag 50 The job tag string (eg "default") 51 cont 52 If this is the continuation of this job 53 harness_type 54 An alternative server harness 55 """ 56 self.autodir = os.environ['AUTODIR'] 57 self.bindir = os.path.join(self.autodir, 'bin') 58 self.testdir = os.path.join(self.autodir, 'tests') 59 self.profdir = os.path.join(self.autodir, 'profilers') 60 self.tmpdir = os.path.join(self.autodir, 'tmp') 61 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 62 63 if not cont: 64 if os.path.exists(self.tmpdir): 65 system('umount -f %s > /dev/null 2> /dev/null'%\ 66 self.tmpdir, ignorestatus=True) 67 system('rm -rf ' + self.tmpdir) 68 os.mkdir(self.tmpdir) 69 70 results = os.path.join(self.autodir, 'results') 71 if not os.path.exists(results): 72 os.mkdir(results) 73 74 download = os.path.join(self.testdir, 'download') 75 if os.path.exists(download): 76 system('rm -rf ' + download) 77 os.mkdir(download) 78 79 if os.path.exists(self.resultdir): 80 system('rm -rf ' + self.resultdir) 81 os.mkdir(self.resultdir) 82 83 os.mkdir(os.path.join(self.resultdir, 'debug')) 84 os.mkdir(os.path.join(self.resultdir, 'analysis')) 85 os.mkdir(os.path.join(self.resultdir, 'sysinfo')) 86 87 shutil.copyfile(control, os.path.join(self.resultdir, 'control')) 88 89 self.control = control 90 self.jobtab = jobtag 91 92 self.stdout = fd_stack.fd_stack(1, sys.stdout) 93 self.stderr = fd_stack.fd_stack(2, sys.stderr) 94 self.record_prefix = '' 95 96 self.config = config.config(self) 97 98 self.harness = harness.select(harness_type, self) 99 100 self.profilers = profilers.profilers(self) 101 102 try: 103 tool = self.config_get('boottool.executable') 104 self.bootloader = boottool.boottool(tool) 105 except: 106 pass 107 108 pwd = os.getcwd() 109 os.chdir(os.path.join(self.resultdir, 'sysinfo')) 110 system(os.path.join(self.bindir, 'sysinfo.py')) 111 system('dmesg -c > dmesg', ignorestatus=1) 112 os.chdir(pwd) 113 114 self.harness.run_start() 115 116 117 def relative_path(self, path): 118 """\ 119 Return a patch relative to the job results directory 120 """ 121 head = len(self.resultdir) + 1 # remove the / inbetween 122 return path[head:] 123 124 125 def control_get(self): 126 return self.control 127 128 129 def harness_select(self, which): 130 self.harness = harness.select(which, self) 131 132 133 def config_set(self, name, value): 134 self.config.set(name, value) 135 136 137 def config_get(self, name): 138 return self.config.get(name) 139 140 def setup_dirs(self, results_dir, tmp_dir): 141 if not tmp_dir: 142 tmp_dir = os.path.join(self.tmpdir, 'build') 143 if not os.path.exists(tmp_dir): 144 os.mkdir(tmp_dir) 145 if not os.path.isdir(tmp_dir): 146 raise "Temp dir (%s) is not a dir - args backwards?" \ 147 % self.tmpdir 148 149 # We label the first build "build" and then subsequent ones 150 # as "build.2", "build.3", etc. Whilst this is a little bit 151 # inconsistent, 99.9% of jobs will only have one build 152 # (that's not done as kernbench, sparse, or buildtest), 153 # so it works out much cleaner. One of life's comprimises. 154 if not results_dir: 155 results_dir = os.path.join(self.resultdir, 'build') 156 i = 2 157 while os.path.exists(results_dir): 158 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 159 i += 1 160 if not os.path.exists(results_dir): 161 os.mkdir(results_dir) 162 163 return (results_dir, tmp_dir) 164 165 166 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 167 kjob = None ): 168 """Summon a xen object""" 169 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 170 build_dir = 'xen' 171 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 172 173 174 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 175 """Summon a kernel object""" 176 if base_tree.endswith('.rpm'): 177 return kernel.rpm_kernel(self, base_tree, results_dir) 178 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 179 build_dir = 'linux' 180 return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave) 181 182 183 def barrier(self, *args): 184 """Create a barrier object""" 185 return barrier.barrier(*args) 186 187 188 def setup_dep(self, deps): 189 """Set up the dependencies for this test. 190 191 deps is a list of libraries required for this test. 192 """ 193 for dep in deps: 194 try: 195 os.chdir(os.path.join(self.autodir, 'deps', dep)) 196 system('./' + dep + '.py') 197 except: 198 error = "setting up dependency " + dep + "\n" 199 raise UnhandledError(error) 200 201 202 def __runtest(self, url, tag, args, dargs): 203 try: 204 test.runtest(self, url, tag, args, dargs) 205 except AutotestError: 206 raise 207 except: 208 raise UnhandledError('running test ' + \ 209 self.__class__.__name__ + "\n") 210 211 212 def runtest(self, tag, url, *args): 213 raise "Deprecated call to job.runtest. Use run_test instead" 214 215 216 def run_test(self, url, *args, **dargs): 217 """Summon a test object and run it. 218 219 tag 220 tag to add to testname 221 url 222 url of the test to run 223 """ 224 225 if not url: 226 raise "Test name is invalid. Switched arguments?" 227 (group, testname) = test.testname(url) 228 tag = None 229 subdir = testname 230 if dargs.has_key('tag'): 231 tag = dargs['tag'] 232 del dargs['tag'] 233 if tag: 234 subdir += '.' + tag 235 try: 236 try: 237 self.__runtest(url, tag, args, dargs) 238 except Exception, detail: 239 self.record('FAIL', subdir, testname, \ 240 detail.__str__()) 241 242 raise 243 else: 244 self.record('GOOD', subdir, testname, \ 245 'completed successfully') 246 except TestError: 247 return 0 248 except: 249 raise 250 else: 251 return 1 252 253 254 def run_group(self, function, *args, **dargs): 255 """\ 256 function: 257 subroutine to run 258 *args: 259 arguments for the function 260 """ 261 262 result = None 263 name = function.__name__ 264 265 # Allow the tag for the group to be specified. 266 if dargs.has_key('tag'): 267 tag = dargs['tag'] 268 del dargs['tag'] 269 if tag: 270 name = tag 271 272 # if tag: 273 # name += '.' + tag 274 old_record_prefix = self.record_prefix 275 try: 276 try: 277 self.record('START', None, name) 278 self.record_prefix += '\t' 279 result = function(*args, **dargs) 280 self.record_prefix = old_record_prefix 281 self.record('END GOOD', None, name) 282 except: 283 self.record_prefix = old_record_prefix 284 self.record('END FAIL', None, name, format_error()) 285 # We don't want to raise up an error higher if it's just 286 # a TestError - we want to carry on to other tests. Hence 287 # this outer try/except block. 288 except TestError: 289 pass 290 except: 291 raise TestError(name + ' failed\n' + format_error()) 292 293 return result 294 295 296 # Check the passed kernel identifier against the command line 297 # and the running kernel, abort the job on missmatch. 298 def kernel_check_ident(self, expected_when, expected_id, subdir): 299 print "POST BOOT: checking booted kernel mark=%d identity='%s'" \ 300 % (expected_when, expected_id) 301 302 running_id = running_os_ident() 303 304 cmdline = read_one_line("/proc/cmdline") 305 306 find_sum = re.compile(r'.*IDENT=(\d+)') 307 m = find_sum.match(cmdline) 308 cmdline_when = -1 309 if m: 310 cmdline_when = int(m.groups()[0]) 311 312 # We have all the facts, see if they indicate we 313 # booted the requested kernel or not. 314 bad = False 315 if expected_id != running_id: 316 print "check_kernel_ident: kernel identifier mismatch" 317 bad = True 318 if expected_when != cmdline_when: 319 print "check_kernel_ident: kernel command line mismatch" 320 bad = True 321 322 if bad: 323 print " Expected Ident: " + expected_id 324 print " Running Ident: " + running_id 325 print " Expected Mark: %d" % (expected_when) 326 print "Command Line Mark: %d" % (cmdline_when) 327 print " Command Line: " + cmdline 328 329 raise JobError("boot failure") 330 331 self.record('GOOD', subdir, 'boot', 'boot successful') 332 333 334 def filesystem(self, device, mountpoint = None, loop_size = 0): 335 if not mountpoint: 336 mountpoint = self.tmpdir 337 return filesystem.filesystem(self, device, mountpoint,loop_size) 338 339 340 def reboot(self, tag='autotest'): 341 self.harness.run_reboot() 342 default = self.config_get('boot.set_default') 343 if default: 344 self.bootloader.set_default(tag) 345 else: 346 self.bootloader.boot_once(tag) 347 system("reboot") 348 self.quit() 349 350 351 def noop(self, text): 352 print "job: noop: " + text 353 354 355 # Job control primatives. 356 357 def __parallel_execute(self, func, *args): 358 func(*args) 359 360 361 def parallel(self, *tasklist): 362 """Run tasks in parallel""" 363 364 pids = [] 365 for task in tasklist: 366 pids.append(fork_start(self.resultdir, 367 lambda: self.__parallel_execute(*task))) 368 for pid in pids: 369 fork_waitfor(self.resultdir, pid) 370 371 372 def quit(self): 373 # XXX: should have a better name. 374 self.harness.run_pause() 375 raise JobContinue("more to come") 376 377 378 def complete(self, status): 379 """Clean up and exit""" 380 # We are about to exit 'complete' so clean up the control file. 381 try: 382 os.unlink(self.control + '.state') 383 except: 384 pass 385 self.harness.run_complete() 386 sys.exit(status) 387 388 389 steps = [] 390 def next_step(self, step): 391 """Define the next step""" 392 if not isinstance(step[0], basestring): 393 step[0] = step[0].__name__ 394 self.steps.append(step) 395 pickle.dump(self.steps, open(self.control + '.state', 'w')) 396 397 398 def next_step_prepend(self, step): 399 """Insert a new step, executing first""" 400 if not isinstance(step[0], basestring): 401 step[0] = step[0].__name__ 402 self.steps.insert(0, step) 403 pickle.dump(self.steps, open(self.control + '.state', 'w')) 404 405 406 def step_engine(self): 407 """the stepping engine -- if the control file defines 408 step_init we will be using this engine to drive multiple runs. 409 """ 410 """Do the next step""" 411 lcl = dict({'job': self}) 412 413 str = """ 414from error import * 415from autotest_utils import * 416""" 417 exec(str, lcl, lcl) 418 execfile(self.control, lcl, lcl) 419 420 state = self.control + '.state' 421 # If there is a mid-job state file load that in and continue 422 # where it indicates. Otherwise start stepping at the passed 423 # entry. 424 try: 425 self.steps = pickle.load(open(state, 'r')) 426 except: 427 if lcl.has_key('step_init'): 428 self.next_step([lcl['step_init']]) 429 430 # Run the step list. 431 while len(self.steps) > 0: 432 step = self.steps.pop(0) 433 pickle.dump(self.steps, open(state, 'w')) 434 435 cmd = step.pop(0) 436 lcl['__args'] = step 437 exec(cmd + "(*__args)", lcl, lcl) 438 439 440 def record(self, status_code, subdir, operation, status = ''): 441 """ 442 Record job-level status 443 444 The intent is to make this file both machine parseable and 445 human readable. That involves a little more complexity, but 446 really isn't all that bad ;-) 447 448 Format is <status code>\t<subdir>\t<operation>\t<status> 449 450 status code: (GOOD|WARN|FAIL|ABORT) 451 or START 452 or END (GOOD|WARN|FAIL|ABORT) 453 454 subdir: MUST be a relevant subdirectory in the results, 455 or None, which will be represented as '----' 456 457 operation: description of what you ran (e.g. "dbench", or 458 "mkfs -t foobar /dev/sda9") 459 460 status: error message or "completed sucessfully" 461 462 ------------------------------------------------------------ 463 464 Initial tabs indicate indent levels for grouping, and is 465 governed by self.record_prefix 466 467 multiline messages have secondary lines prefaced by a double 468 space (' ') 469 """ 470 471 if subdir: 472 if re.match(r'[\n\t]', subdir): 473 raise "Invalid character in subdir string" 474 substr = subdir 475 else: 476 substr = '----' 477 478 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 479 status_code): 480 raise "Invalid status code supplied: %s" % status_code 481 if re.match(r'[\n\t]', operation): 482 raise "Invalid character in operation string" 483 operation = operation.rstrip() 484 status = status.rstrip() 485 status = re.sub(r"\t", " ", status) 486 # Ensure any continuation lines are marked so we can 487 # detect them in the status file to ensure it is parsable. 488 status = re.sub(r"\n", "\n" + self.record_prefix + " ", status) 489 490 msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status) 491 492 self.harness.test_status_detail(status_code, substr, 493 operation, status) 494 self.harness.test_status(msg) 495 print msg 496 status_file = os.path.join(self.resultdir, 'status') 497 open(status_file, "a").write(self.record_prefix + msg + "\n") 498 if subdir: 499 status_file = os.path.join(self.resultdir, subdir, 'status') 500 open(status_file, "a").write(msg + "\n") 501 502 503def runjob(control, cont = False, tag = "default", harness_type = ''): 504 """The main interface to this module 505 506 control 507 The control file to use for this job. 508 cont 509 Whether this is the continuation of a previously started job 510 """ 511 control = os.path.abspath(control) 512 state = control + '.state' 513 514 # instantiate the job object ready for the control file. 515 myjob = None 516 try: 517 # Check that the control file is valid 518 if not os.path.exists(control): 519 raise JobError(control + ": control file not found") 520 521 # When continuing, the job is complete when there is no 522 # state file, ensure we don't try and continue. 523 if cont and not os.path.exists(state): 524 sys.exit(1) 525 if cont == False and os.path.exists(state): 526 os.unlink(state) 527 528 myjob = job(control, tag, cont, harness_type) 529 530 # Load in the users control file, may do any one of: 531 # 1) execute in toto 532 # 2) define steps, and select the first via next_step() 533 myjob.step_engine() 534 535 except JobContinue: 536 sys.exit(5) 537 538 except JobError, instance: 539 print "JOB ERROR: " + instance.args[0] 540 if myjob != None: 541 myjob.record('ABORT', None, instance.args[0]) 542 myjob.complete(1) 543 544 except: 545 if myjob: 546 myjob.harness.run_abort() 547 # Ensure we cannot continue this job, it is in rictus. 548 if os.path.exists(state): 549 os.unlink(state) 550 raise 551 552 # If we get here, then we assume the job is complete and good. 553 myjob.complete(0) 554 555