job.py revision adff6ca0cb1fd0eb9d715fb45ec33708a02579ba
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil, time, traceback 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13from common.error import * 14from common import barrier 15import kernel, xen, test, profilers, filesystem, fd_stack, boottool 16import harness, config 17import sysinfo 18import cpuset 19 20class job: 21 """The actual job against which we do everything. 22 23 Properties: 24 autodir 25 The top level autotest directory (/usr/local/autotest). 26 Comes from os.environ['AUTODIR']. 27 bindir 28 <autodir>/bin/ 29 testdir 30 <autodir>/tests/ 31 profdir 32 <autodir>/profilers/ 33 tmpdir 34 <autodir>/tmp/ 35 resultdir 36 <autodir>/results/<jobtag> 37 stdout 38 fd_stack object for stdout 39 stderr 40 fd_stack object for stderr 41 profilers 42 the profilers object for this job 43 harness 44 the server harness object for this job 45 config 46 the job configuration for this job 47 """ 48 49 DEFAULT_LOG_FILENAME = "status" 50 51 def __init__(self, control, jobtag, cont, harness_type=None): 52 """ 53 control 54 The control file (pathname of) 55 jobtag 56 The job tag string (eg "default") 57 cont 58 If this is the continuation of this job 59 harness_type 60 An alternative server harness 61 """ 62 self.autodir = os.environ['AUTODIR'] 63 self.bindir = os.path.join(self.autodir, 'bin') 64 self.testdir = os.path.join(self.autodir, 'tests') 65 self.profdir = os.path.join(self.autodir, 'profilers') 66 self.tmpdir = os.path.join(self.autodir, 'tmp') 67 self.resultdir = os.path.join(self.autodir, 'results', jobtag) 68 self.sysinfodir = os.path.join(self.resultdir, 'sysinfo') 69 self.control = os.path.abspath(control) 70 71 if not cont: 72 if os.path.exists(self.tmpdir): 73 system('umount -f %s > /dev/null 2> /dev/null'%\ 74 self.tmpdir, ignorestatus=True) 75 system('rm -rf ' + self.tmpdir) 76 os.mkdir(self.tmpdir) 77 78 results = os.path.join(self.autodir, 'results') 79 if not os.path.exists(results): 80 os.mkdir(results) 81 82 download = os.path.join(self.testdir, 'download') 83 if os.path.exists(download): 84 system('rm -rf ' + download) 85 os.mkdir(download) 86 87 if os.path.exists(self.resultdir): 88 system('rm -rf ' + self.resultdir) 89 os.mkdir(self.resultdir) 90 os.mkdir(self.sysinfodir) 91 92 os.mkdir(os.path.join(self.resultdir, 'debug')) 93 os.mkdir(os.path.join(self.resultdir, 'analysis')) 94 95 shutil.copyfile(self.control, 96 os.path.join(self.resultdir, 'control')) 97 98 self.control = control 99 self.jobtag = jobtag 100 self.log_filename = self.DEFAULT_LOG_FILENAME 101 102 self.stdout = fd_stack.fd_stack(1, sys.stdout) 103 self.stderr = fd_stack.fd_stack(2, sys.stderr) 104 self.group_level = 0 105 106 self.config = config.config(self) 107 108 self.harness = harness.select(harness_type, self) 109 110 self.profilers = profilers.profilers(self) 111 112 try: 113 tool = self.config_get('boottool.executable') 114 self.bootloader = boottool.boottool(tool) 115 except: 116 pass 117 118 sysinfo.log_per_reboot_data(self.sysinfodir) 119 120 if not cont: 121 self.record('START', None, None) 122 self.group_level = 1 123 124 self.harness.run_start() 125 126 127 def relative_path(self, path): 128 """\ 129 Return a patch relative to the job results directory 130 """ 131 head = len(self.resultdir) + 1 # remove the / inbetween 132 return path[head:] 133 134 135 def control_get(self): 136 return self.control 137 138 139 def control_set(self, control): 140 self.control = os.path.abspath(control) 141 142 143 def harness_select(self, which): 144 self.harness = harness.select(which, self) 145 146 147 def config_set(self, name, value): 148 self.config.set(name, value) 149 150 151 def config_get(self, name): 152 return self.config.get(name) 153 154 def setup_dirs(self, results_dir, tmp_dir): 155 if not tmp_dir: 156 tmp_dir = os.path.join(self.tmpdir, 'build') 157 if not os.path.exists(tmp_dir): 158 os.mkdir(tmp_dir) 159 if not os.path.isdir(tmp_dir): 160 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir 161 raise ValueError(e_msg) 162 163 # We label the first build "build" and then subsequent ones 164 # as "build.2", "build.3", etc. Whilst this is a little bit 165 # inconsistent, 99.9% of jobs will only have one build 166 # (that's not done as kernbench, sparse, or buildtest), 167 # so it works out much cleaner. One of life's comprimises. 168 if not results_dir: 169 results_dir = os.path.join(self.resultdir, 'build') 170 i = 2 171 while os.path.exists(results_dir): 172 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 173 i += 1 174 if not os.path.exists(results_dir): 175 os.mkdir(results_dir) 176 177 return (results_dir, tmp_dir) 178 179 180 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 181 kjob = None ): 182 """Summon a xen object""" 183 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 184 build_dir = 'xen' 185 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 186 187 188 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 189 """Summon a kernel object""" 190 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 191 build_dir = 'linux' 192 return kernel.auto_kernel(self, base_tree, results_dir, 193 tmp_dir, build_dir, leave) 194 195 196 def barrier(self, *args, **kwds): 197 """Create a barrier object""" 198 return barrier.barrier(*args, **kwds) 199 200 201 def setup_dep(self, deps): 202 """Set up the dependencies for this test. 203 204 deps is a list of libraries required for this test. 205 """ 206 for dep in deps: 207 try: 208 os.chdir(os.path.join(self.autodir, 'deps', dep)) 209 system('./' + dep + '.py') 210 except: 211 error = "setting up dependency " + dep + "\n" 212 raise UnhandledError(error) 213 214 215 def __runtest(self, url, tag, args, dargs): 216 try: 217 l = lambda : test.runtest(self, url, tag, args, dargs) 218 pid = fork_start(self.resultdir, l) 219 fork_waitfor(self.resultdir, pid) 220 except AutotestError: 221 raise 222 except: 223 raise UnhandledError('running test ' + \ 224 self.__class__.__name__ + "\n") 225 226 227 def run_test(self, url, *args, **dargs): 228 """Summon a test object and run it. 229 230 tag 231 tag to add to testname 232 url 233 url of the test to run 234 """ 235 236 if not url: 237 raise TypeError("Test name is invalid. Switched arguments?") 238 (group, testname) = test.testname(url) 239 tag = dargs.pop('tag', None) 240 self.container = None 241 container = dargs.pop('container', None) 242 subdir = testname 243 if tag: 244 subdir += '.' + tag 245 246 if container: 247 container_name = container.pop('container_name', None) 248 cpu = container.get('cpu', None) 249 root_container = container.get('root', 'sys') 250 if not container_name: 251 container_name = testname 252 if not grep('cpusets', '/proc/filesystems'): 253 254 self.container = cpuset.cpuset(container_name, 255 container['mem'], 256 os.getpid(), 257 root = root_container, 258 cpus = cpu) 259 # We are running in a container now... 260 261 def group_func(): 262 try: 263 self.__runtest(url, tag, args, dargs) 264 except Exception, detail: 265 self.record('FAIL', subdir, testname, 266 str(detail)) 267 raise 268 else: 269 self.record('GOOD', subdir, testname, 270 'completed successfully') 271 result, exc_info = self.__rungroup(subdir, group_func) 272 if self.container: 273 self.container.release() 274 self.container = None 275 276 if exc_info and isinstance(exc_info[1], TestError): 277 return False 278 elif exc_info: 279 raise exc_info[0], exc_info[1], exc_info[2] 280 else: 281 return True 282 283 284 def __rungroup(self, name, function, *args, **dargs): 285 """\ 286 name: 287 name of the group 288 function: 289 subroutine to run 290 *args: 291 arguments for the function 292 293 Returns a 2-tuple (result, exc_info) where result 294 is the return value of function, and exc_info is 295 the sys.exc_info() of the exception thrown by the 296 function (which may be None). 297 """ 298 299 result, exc_info = None, None 300 try: 301 self.record('START', None, name) 302 self.group_level += 1 303 result = function(*args, **dargs) 304 self.group_level -= 1 305 self.record('END GOOD', None, name) 306 except Exception, e: 307 exc_info = sys.exc_info() 308 self.group_level -= 1 309 err_msg = str(e) + '\n' + format_error() 310 self.record('END FAIL', None, name, err_msg) 311 312 return result, exc_info 313 314 315 def run_group(self, function, *args, **dargs): 316 """\ 317 function: 318 subroutine to run 319 *args: 320 arguments for the function 321 """ 322 323 # Allow the tag for the group to be specified 324 name = function.__name__ 325 tag = dargs.pop('tag', None) 326 if tag: 327 name = tag 328 329 result, exc_info = self.__rungroup(name, function, 330 *args, **dargs) 331 332 # if there was a non-TestError exception, raise it 333 if exc_info and not isinstance(exc_info[1], TestError): 334 err = ''.join(traceback.format_exception(*exc_info)) 335 raise TestError(name + ' failed\n' + err) 336 337 # pass back the actual return value from the function 338 return result 339 340 341 # Check the passed kernel identifier against the command line 342 # and the running kernel, abort the job on missmatch. 343 def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'): 344 print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \ 345 % (expected_when, expected_id, expected_cl, type) 346 347 running_id = running_os_ident() 348 349 cmdline = read_one_line("/proc/cmdline") 350 351 find_sum = re.compile(r'.*IDENT=(\d+)') 352 m = find_sum.match(cmdline) 353 cmdline_when = -1 354 if m: 355 cmdline_when = int(m.groups()[0]) 356 357 cl_re = re.compile(r'\d{7,}') 358 cl_match = cl_re.search(system_output('uname -v').split()[1]) 359 if cl_match: 360 current_cl = cl_match.group() 361 else: 362 current_cl = None 363 364 # We have all the facts, see if they indicate we 365 # booted the requested kernel or not. 366 bad = False 367 if (type == 'src' and expected_id != running_id or 368 type == 'rpm' and not running_id.startswith(expected_id + '::')): 369 print "check_kernel_ident: kernel identifier mismatch" 370 bad = True 371 if expected_when != cmdline_when: 372 print "check_kernel_ident: kernel command line mismatch" 373 bad = True 374 if expected_cl and current_cl and str(expected_cl) != current_cl: 375 print 'check_kernel_ident: kernel changelist mismatch' 376 bad = True 377 378 if bad: 379 print " Expected Ident: " + expected_id 380 print " Running Ident: " + running_id 381 print " Expected Mark: %d" % (expected_when) 382 print "Command Line Mark: %d" % (cmdline_when) 383 print " Expected P4 CL: %s" % expected_cl 384 print " P4 CL: %s" % current_cl 385 print " Command Line: " + cmdline 386 387 raise JobError("boot failure", "reboot.verify") 388 389 self.record('GOOD', subdir, 'reboot.verify') 390 391 392 def filesystem(self, device, mountpoint = None, loop_size = 0): 393 if not mountpoint: 394 mountpoint = self.tmpdir 395 return filesystem.filesystem(self, device, mountpoint,loop_size) 396 397 398 def reboot(self, tag='autotest'): 399 self.record('GOOD', None, 'reboot.start') 400 self.harness.run_reboot() 401 default = self.config_get('boot.set_default') 402 if default: 403 self.bootloader.set_default(tag) 404 else: 405 self.bootloader.boot_once(tag) 406 system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &") 407 self.quit() 408 409 410 def noop(self, text): 411 print "job: noop: " + text 412 413 414 def parallel(self, *tasklist): 415 """Run tasks in parallel""" 416 417 pids = [] 418 old_log_filename = self.log_filename 419 for i, task in enumerate(tasklist): 420 self.log_filename = old_log_filename + (".%d" % i) 421 task_func = lambda: task[0](*task[1:]) 422 pids.append(fork_start(self.resultdir, task_func)) 423 424 old_log_path = os.path.join(self.resultdir, old_log_filename) 425 old_log = open(old_log_path, "a") 426 exceptions = [] 427 for i, pid in enumerate(pids): 428 # wait for the task to finish 429 try: 430 fork_waitfor(self.resultdir, pid) 431 except Exception, e: 432 exceptions.append(e) 433 # copy the logs from the subtask into the main log 434 new_log_path = old_log_path + (".%d" % i) 435 if os.path.exists(new_log_path): 436 new_log = open(new_log_path) 437 old_log.write(new_log.read()) 438 new_log.close() 439 old_log.flush() 440 os.remove(new_log_path) 441 old_log.close() 442 443 self.log_filename = old_log_filename 444 445 # handle any exceptions raised by the parallel tasks 446 if exceptions: 447 msg = "%d task(s) failed" % len(exceptions) 448 raise JobError(msg, str(exceptions), exceptions) 449 450 451 def quit(self): 452 # XXX: should have a better name. 453 self.harness.run_pause() 454 raise JobContinue("more to come") 455 456 457 def complete(self, status): 458 """Clean up and exit""" 459 # We are about to exit 'complete' so clean up the control file. 460 try: 461 os.unlink(self.control + '.state') 462 except: 463 pass 464 self.harness.run_complete() 465 sys.exit(status) 466 467 468 steps = [] 469 def next_step(self, step): 470 """Define the next step""" 471 if not isinstance(step[0], basestring): 472 step[0] = step[0].__name__ 473 self.steps.append(step) 474 pickle.dump(self.steps, open(self.control + '.state', 'w')) 475 476 477 def next_step_prepend(self, step): 478 """Insert a new step, executing first""" 479 if not isinstance(step[0], basestring): 480 step[0] = step[0].__name__ 481 self.steps.insert(0, step) 482 pickle.dump(self.steps, open(self.control + '.state', 'w')) 483 484 485 def step_engine(self): 486 """the stepping engine -- if the control file defines 487 step_init we will be using this engine to drive multiple runs. 488 """ 489 """Do the next step""" 490 lcl = dict({'job': self}) 491 492 str = """ 493from common.error import * 494from autotest_utils import * 495""" 496 exec(str, lcl, lcl) 497 execfile(self.control, lcl, lcl) 498 499 state = self.control + '.state' 500 # If there is a mid-job state file load that in and continue 501 # where it indicates. Otherwise start stepping at the passed 502 # entry. 503 try: 504 self.steps = pickle.load(open(state, 'r')) 505 except: 506 if lcl.has_key('step_init'): 507 self.next_step([lcl['step_init']]) 508 509 # Run the step list. 510 while len(self.steps) > 0: 511 step = self.steps.pop(0) 512 pickle.dump(self.steps, open(state, 'w')) 513 514 cmd = step.pop(0) 515 lcl['__args'] = step 516 exec(cmd + "(*__args)", lcl, lcl) 517 518 519 def record(self, status_code, subdir, operation, status = ''): 520 """ 521 Record job-level status 522 523 The intent is to make this file both machine parseable and 524 human readable. That involves a little more complexity, but 525 really isn't all that bad ;-) 526 527 Format is <status code>\t<subdir>\t<operation>\t<status> 528 529 status code: (GOOD|WARN|FAIL|ABORT) 530 or START 531 or END (GOOD|WARN|FAIL|ABORT) 532 533 subdir: MUST be a relevant subdirectory in the results, 534 or None, which will be represented as '----' 535 536 operation: description of what you ran (e.g. "dbench", or 537 "mkfs -t foobar /dev/sda9") 538 539 status: error message or "completed sucessfully" 540 541 ------------------------------------------------------------ 542 543 Initial tabs indicate indent levels for grouping, and is 544 governed by self.group_level 545 546 multiline messages have secondary lines prefaced by a double 547 space (' ') 548 """ 549 550 if subdir: 551 if re.match(r'[\n\t]', subdir): 552 raise ValueError("Invalid character in subdir string") 553 substr = subdir 554 else: 555 substr = '----' 556 557 if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \ 558 status_code): 559 raise ValueError("Invalid status code supplied: %s" % status_code) 560 if not operation: 561 operation = '----' 562 if re.match(r'[\n\t]', operation): 563 raise ValueError("Invalid character in operation string") 564 operation = operation.rstrip() 565 status = status.rstrip() 566 status = re.sub(r"\t", " ", status) 567 # Ensure any continuation lines are marked so we can 568 # detect them in the status file to ensure it is parsable. 569 status = re.sub(r"\n", "\n" + "\t" * self.group_level + " ", status) 570 571 # Generate timestamps for inclusion in the logs 572 epoch_time = int(time.time()) # seconds since epoch, in UTC 573 local_time = time.localtime(epoch_time) 574 epoch_time_str = "timestamp=%d" % (epoch_time,) 575 local_time_str = time.strftime("localtime=%b %d %H:%M:%S", 576 local_time) 577 578 msg = '\t'.join(str(x) for x in (status_code, substr, operation, 579 epoch_time_str, local_time_str, 580 status)) 581 msg = '\t' * self.group_level + msg 582 583 msg_tag = "" 584 if "." in self.log_filename: 585 msg_tag = self.log_filename.split(".", 1)[1] 586 587 self.harness.test_status_detail(status_code, substr, operation, 588 status, msg_tag) 589 self.harness.test_status(msg, msg_tag) 590 591 # log to stdout (if enabled) 592 #if self.log_filename == self.DEFAULT_LOG_FILENAME: 593 print msg 594 595 # log to the "root" status log 596 status_file = os.path.join(self.resultdir, self.log_filename) 597 open(status_file, "a").write(msg + "\n") 598 599 # log to the subdir status log (if subdir is set) 600 if subdir: 601 dir = os.path.join(self.resultdir, subdir) 602 if not os.path.exists(dir): 603 os.mkdir(dir) 604 605 status_file = os.path.join(dir, 606 self.DEFAULT_LOG_FILENAME) 607 open(status_file, "a").write(msg + "\n") 608 609 610def runjob(control, cont = False, tag = "default", harness_type = ''): 611 """The main interface to this module 612 613 control 614 The control file to use for this job. 615 cont 616 Whether this is the continuation of a previously started job 617 """ 618 control = os.path.abspath(control) 619 state = control + '.state' 620 621 # instantiate the job object ready for the control file. 622 myjob = None 623 try: 624 # Check that the control file is valid 625 if not os.path.exists(control): 626 raise JobError(control + ": control file not found") 627 628 # When continuing, the job is complete when there is no 629 # state file, ensure we don't try and continue. 630 if cont and not os.path.exists(state): 631 raise JobComplete("all done") 632 if cont == False and os.path.exists(state): 633 os.unlink(state) 634 635 myjob = job(control, tag, cont, harness_type) 636 637 # Load in the users control file, may do any one of: 638 # 1) execute in toto 639 # 2) define steps, and select the first via next_step() 640 myjob.step_engine() 641 642 except JobContinue: 643 sys.exit(5) 644 645 except JobComplete: 646 sys.exit(1) 647 648 except JobError, instance: 649 print "JOB ERROR: " + instance.args[0] 650 if myjob: 651 command = None 652 if len(instance.args) > 1: 653 command = instance.args[1] 654 myjob.group_level = 0 655 myjob.record('ABORT', None, command, instance.args[0]) 656 myjob.record('END ABORT', None, None) 657 myjob.complete(1) 658 else: 659 sys.exit(1) 660 661 except Exception, e: 662 msg = str(e) + '\n' + format_error() 663 print "JOB ERROR: " + msg 664 if myjob: 665 myjob.group_level = 0 666 myjob.record('ABORT', None, None, msg) 667 myjob.record('END ABORT', None, None) 668 myjob.complete(1) 669 else: 670 sys.exit(1) 671 672 # If we get here, then we assume the job is complete and good. 673 myjob.group_level = 0 674 myjob.record('END GOOD', None, None) 675 myjob.complete(0) 676