job.py revision 237bed32e0110ccd0db10823df742534dd7dc50d
1"""The main job wrapper 2 3This is the core infrastructure. 4""" 5 6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006""" 7 8# standard stuff 9import os, sys, re, pickle, shutil 10# autotest stuff 11from autotest_utils import * 12from parallel import * 13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool 14import harness, config 15 16class job: 17 """The actual job against which we do everything. 18 19 Properties: 20 autodir 21 The top level autotest directory (/usr/local/autotest). 22 Comes from os.environ['AUTODIR']. 23 bindir 24 <autodir>/bin/ 25 testdir 26 <autodir>/tests/ 27 profdir 28 <autodir>/profilers/ 29 tmpdir 30 <autodir>/tmp/ 31 resultdir 32 <autodir>/results/<jobtag> 33 stdout 34 fd_stack object for stdout 35 stderr 36 fd_stack object for stderr 37 profilers 38 the profilers object for this job 39 harness 40 the server harness object for this job 41 config 42 the job configuration for this job 43 """ 44 45 def __init__(self, control, jobtag, cont, harness_type=None): 46 """ 47 control 48 The control file (pathname of) 49 jobtag 50 The job tag string (eg "default") 51 cont 52 If this is the continuation of this job 53 harness_type 54 An alternative server harness 55 """ 56 self.autodir = os.environ['AUTODIR'] 57 self.bindir = self.autodir + '/bin' 58 self.testdir = self.autodir + '/tests' 59 self.profdir = self.autodir + '/profilers' 60 self.tmpdir = self.autodir + '/tmp' 61 self.resultdir = self.autodir + '/results/' + jobtag 62 63 if not cont: 64 if os.path.exists(self.tmpdir): 65 system('rm -rf ' + self.tmpdir) 66 os.mkdir(self.tmpdir) 67 68 if not os.path.exists(self.autodir + '/results'): 69 os.mkdir(self.autodir + '/results') 70 71 if os.path.exists(self.resultdir): 72 system('rm -rf ' + self.resultdir) 73 os.mkdir(self.resultdir) 74 75 os.mkdir(self.resultdir + "/debug") 76 os.mkdir(self.resultdir + "/analysis") 77 os.mkdir(self.resultdir + "/sysinfo") 78 shutil.copyfile(control, self.resultdir + "/control") 79 80 self.control = control 81 self.jobtab = jobtag 82 83 self.stdout = fd_stack.fd_stack(1, sys.stdout) 84 self.stderr = fd_stack.fd_stack(2, sys.stderr) 85 self.record_prefix = '' 86 87 self.config = config.config(self) 88 89 self.harness = harness.select(harness_type, self) 90 91 self.profilers = profilers.profilers(self) 92 93 try: 94 tool = self.config_get('boottool.executable') 95 self.bootloader = boottool.boottool(tool) 96 except: 97 pass 98 99 pwd = os.getcwd() 100 os.chdir(self.resultdir + "/sysinfo") 101 system(self.bindir + '/sysinfo.py') 102 system('dmesg -c > dmesg', ignorestatus=1) 103 os.chdir(pwd) 104 105 self.harness.run_start() 106 107 108 def relative_path(self, path): 109 """\ 110 Return a patch relative to the job results directory 111 """ 112 head = len(self.resultdir) + 1 # remove the / inbetween 113 return path[head:] 114 115 116 def control_get(self): 117 return self.control 118 119 120 def harness_select(self, which): 121 self.harness = harness.select(which, self) 122 123 124 def config_set(self, name, value): 125 self.config.set(name, value) 126 127 128 def config_get(self, name): 129 return self.config.get(name) 130 131 def setup_dirs(self, results_dir, tmp_dir): 132 if not tmp_dir: 133 tmp_dir = self.tmpdir + '/build' 134 if not os.path.exists(tmp_dir): 135 os.mkdir(tmp_dir) 136 if not os.path.isdir(tmp_dir): 137 raise "Temp dir (%s) is not a dir - args backwards?" \ 138 % self.tmpdir 139 140 # We label the first build "build" and then subsequent ones 141 # as "build.2", "build.3", etc. Whilst this is a little bit 142 # inconsistent, 99.9% of jobs will only have one build 143 # (that's not done as kernbench, sparse, or buildtest), 144 # so it works out much cleaner. One of life's comprimises. 145 if not results_dir: 146 results_dir = os.path.join(self.resultdir, 'build') 147 i = 2 148 while os.path.exists(results_dir): 149 results_dir = os.path.join(self.resultdir, 'build.%d' % i) 150 i += 1 151 if not os.path.exists(results_dir): 152 os.mkdir(results_dir) 153 154 return (results_dir, tmp_dir) 155 156 157 def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \ 158 kjob = None ): 159 """Summon a xen object""" 160 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 161 build_dir = 'xen' 162 return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob) 163 164 165 def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False): 166 """Summon a kernel object""" 167 (results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir) 168 build_dir = 'linux' 169 return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave) 170 171 172 def barrier(self, *args): 173 """Create a barrier object""" 174 return barrier.barrier(*args) 175 176 177 def setup_dep(self, deps): 178 """Set up the dependencies for this test. 179 180 deps is a list of libraries required for this test. 181 """ 182 for dep in deps: 183 try: 184 os.chdir(self.autodir + '/deps/' + dep) 185 system('./' + dep + '.py') 186 except: 187 error = "setting up dependency " + dep + "\n" 188 raise UnhandledError(error) 189 190 191 def __runtest(self, url, tag, args, dargs): 192 try: 193 test.runtest(self, url, tag, args, dargs) 194 except AutotestError: 195 raise 196 except: 197 raise UnhandledError('running test ' + \ 198 self.__class__.__name__ + "\n") 199 200 201 def runtest(self, tag, url, *args): 202 raise "Deprecated call to job.runtest. Use run_test instead" 203 204 205 def run_test(self, url, *args, **dargs): 206 """Summon a test object and run it. 207 208 tag 209 tag to add to testname 210 url 211 url of the test to run 212 """ 213 214 if not url: 215 raise "Test name is invalid. Switched arguments?" 216 (group, name) = test.testname(url) 217 tag = None 218 if dargs.has_key('tag'): 219 tag = dargs['tag'] 220 del dargs['tag'] 221 if tag: 222 name += '.' + tag 223 try: 224 try: 225 self.__runtest(url, tag, args, dargs) 226 except Exception, detail: 227 self.record("FAIL " + name + " " + \ 228 detail.__str__() + "\n") 229 230 raise 231 else: 232 self.record("GOOD " + name + \ 233 " completed successfully\n") 234 except TestError: 235 return 0 236 except: 237 raise 238 else: 239 return 1 240 241 242 def run_group(self, function, *args): 243 """\ 244 function: 245 subroutine to run 246 *args: 247 arguments for the function 248 """ 249 250 name = function.__name__ 251 # if tag: 252 # name += '.' + tag 253 old_record_prefix = self.record_prefix 254 try: 255 try: 256 self.record("START " + name) 257 self.record_prefix += '\t' 258 function(*args) 259 self.record_prefix = old_record_prefix 260 self.record("END %s GOOD" % name) 261 except: 262 self.record_prefix = old_record_prefix 263 self.record("END %s FAIL" % name) 264 # We don't want to raise up an error higher if it's just 265 # a TestError - we want to carry on to other tests. Hence 266 # this outer try/except block. 267 except TestError: 268 pass 269 except: 270 raise TestError(name + ' failed\n' + format_error()) 271 272 273 def filesystem(self, device, mountpoint = None, loop_size = 0): 274 if not mountpoint: 275 mountpoint = self.tmpdir 276 return filesystem.filesystem(self, device, mountpoint,loop_size) 277 278 279 def reboot(self, tag='autotest'): 280 self.harness.run_reboot() 281 self.bootloader.boot_once(tag) 282 system("reboot") 283 self.quit() 284 285 286 def noop(self, text): 287 print "job: noop: " + text 288 289 290 # Job control primatives. 291 292 def __parallel_execute(self, func, *args): 293 func(*args) 294 295 296 def parallel(self, *tasklist): 297 """Run tasks in parallel""" 298 299 pids = [] 300 for task in tasklist: 301 pids.append(fork_start(self.resultdir, 302 lambda: self.__parallel_execute(*task))) 303 for pid in pids: 304 fork_waitfor(self.resultdir, pid) 305 306 307 def quit(self): 308 # XXX: should have a better name. 309 self.harness.run_pause() 310 raise JobContinue("more to come") 311 312 313 def complete(self, status): 314 """Clean up and exit""" 315 # We are about to exit 'complete' so clean up the control file. 316 try: 317 os.unlink(self.control + '.state') 318 except: 319 pass 320 self.harness.run_complete() 321 sys.exit(status) 322 323 324 steps = [] 325 def next_step(self, step): 326 """Define the next step""" 327 step[0] = step[0].__name__ 328 self.steps.append(step) 329 pickle.dump(self.steps, open(self.control + '.state', 'w')) 330 331 332 def next_step_prepend(self, step): 333 """Insert a new step, executing first""" 334 step[0] = step[0].__name__ 335 self.steps.insert(0, step) 336 pickle.dump(self.steps, open(self.control + '.state', 'w')) 337 338 339 def step_engine(self): 340 """the stepping engine -- if the control file defines 341 step_init we will be using this engine to drive multiple runs. 342 """ 343 """Do the next step""" 344 lcl = dict({'job': self}) 345 346 str = """ 347from error import * 348from autotest_utils import * 349""" 350 exec(str, lcl, lcl) 351 execfile(self.control, lcl, lcl) 352 353 state = self.control + '.state' 354 # If there is a mid-job state file load that in and continue 355 # where it indicates. Otherwise start stepping at the passed 356 # entry. 357 try: 358 self.steps = pickle.load(open(state, 'r')) 359 except: 360 if lcl.has_key('step_init'): 361 self.next_step([lcl['step_init']]) 362 363 # Run the step list. 364 while len(self.steps) > 0: 365 step = self.steps.pop(0) 366 pickle.dump(self.steps, open(state, 'w')) 367 368 cmd = step.pop(0) 369 cmd = lcl[cmd] 370 lcl['__cmd'] = cmd 371 lcl['__args'] = step 372 exec("__cmd(*__args)", lcl, lcl) 373 374 375 def record(self, msg): 376 """Record job-level status""" 377 378 msg = msg.rstrip() 379 # Ensure any continuation lines are marked so we can 380 # detect them in the status file to ensure it is parsable. 381 msg = re.sub(r"\n", "\n" + self.record_prefix + " ", msg) 382 msg = self.record_prefix + msg 383 384 self.harness.test_status(msg) 385 print msg 386 status = self.resultdir + "/status" 387 file(status, "a").write(msg + "\n") 388 389 390def runjob(control, cont = False, tag = "default", harness_type = ''): 391 """The main interface to this module 392 393 control 394 The control file to use for this job. 395 cont 396 Whether this is the continuation of a previously started job 397 """ 398 control = os.path.abspath(control) 399 state = control + '.state' 400 401 # instantiate the job object ready for the control file. 402 myjob = None 403 try: 404 # Check that the control file is valid 405 if not os.path.exists(control): 406 raise JobError(control + ": control file not found") 407 408 # When continuing, the job is complete when there is no 409 # state file, ensure we don't try and continue. 410 if cont and not os.path.exists(state): 411 sys.exit(1) 412 if cont == False and os.path.exists(state): 413 os.unlink(state) 414 415 myjob = job(control, tag, cont, harness_type) 416 417 # Load in the users control file, may do any one of: 418 # 1) execute in toto 419 # 2) define steps, and select the first via next_step() 420 myjob.step_engine() 421 422 except JobContinue: 423 sys.exit(5) 424 425 except JobError, instance: 426 print "JOB ERROR: " + instance.args[0] 427 if myjob != None: 428 myjob.record("ABORT " + instance.args[0] + "\n") 429 myjob.complete(1) 430 431 except: 432 if myjob: 433 myjob.harness.run_abort() 434 # Ensure we cannot continue this job, it is in rictus. 435 if os.path.exists(state): 436 os.unlink(state) 437 raise 438 439 # If we get here, then we assume the job is complete and good. 440 myjob.complete(0) 441 442