job.py revision 237bed32e0110ccd0db10823df742534dd7dc50d
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
14import harness, config
15
16class job:
17	"""The actual job against which we do everything.
18
19	Properties:
20		autodir
21			The top level autotest directory (/usr/local/autotest).
22			Comes from os.environ['AUTODIR'].
23		bindir
24			<autodir>/bin/
25		testdir
26			<autodir>/tests/
27		profdir
28			<autodir>/profilers/
29		tmpdir
30			<autodir>/tmp/
31		resultdir
32			<autodir>/results/<jobtag>
33		stdout
34			fd_stack object for stdout
35		stderr
36			fd_stack object for stderr
37		profilers
38			the profilers object for this job
39		harness
40			the server harness object for this job
41		config
42			the job configuration for this job
43	"""
44
45	def __init__(self, control, jobtag, cont, harness_type=None):
46		"""
47			control
48				The control file (pathname of)
49			jobtag
50				The job tag string (eg "default")
51			cont
52				If this is the continuation of this job
53			harness_type
54				An alternative server harness
55		"""
56		self.autodir = os.environ['AUTODIR']
57		self.bindir = self.autodir + '/bin'
58		self.testdir = self.autodir + '/tests'
59		self.profdir = self.autodir + '/profilers'
60		self.tmpdir = self.autodir + '/tmp'
61		self.resultdir = self.autodir + '/results/' + jobtag
62
63		if not cont:
64			if os.path.exists(self.tmpdir):
65				system('rm -rf ' + self.tmpdir)
66			os.mkdir(self.tmpdir)
67
68			if not os.path.exists(self.autodir + '/results'):
69				os.mkdir(self.autodir + '/results')
70
71			if os.path.exists(self.resultdir):
72				system('rm -rf ' + self.resultdir)
73			os.mkdir(self.resultdir)
74
75			os.mkdir(self.resultdir + "/debug")
76			os.mkdir(self.resultdir + "/analysis")
77			os.mkdir(self.resultdir + "/sysinfo")
78			shutil.copyfile(control, self.resultdir + "/control")
79
80		self.control = control
81		self.jobtab = jobtag
82
83		self.stdout = fd_stack.fd_stack(1, sys.stdout)
84		self.stderr = fd_stack.fd_stack(2, sys.stderr)
85		self.record_prefix = ''
86
87		self.config = config.config(self)
88
89		self.harness = harness.select(harness_type, self)
90
91		self.profilers = profilers.profilers(self)
92
93		try:
94			tool = self.config_get('boottool.executable')
95			self.bootloader = boottool.boottool(tool)
96		except:
97			pass
98
99		pwd = os.getcwd()
100		os.chdir(self.resultdir + "/sysinfo")
101		system(self.bindir + '/sysinfo.py')
102		system('dmesg -c > dmesg', ignorestatus=1)
103		os.chdir(pwd)
104
105		self.harness.run_start()
106
107
108	def relative_path(self, path):
109		"""\
110		Return a patch relative to the job results directory
111		"""
112		head = len(self.resultdir) + 1     # remove the / inbetween
113		return path[head:]
114
115
116	def control_get(self):
117		return self.control
118
119
120	def harness_select(self, which):
121		self.harness = harness.select(which, self)
122
123
124	def config_set(self, name, value):
125		self.config.set(name, value)
126
127
128	def config_get(self, name):
129		return self.config.get(name)
130
131	def setup_dirs(self, results_dir, tmp_dir):
132		if not tmp_dir:
133			tmp_dir = self.tmpdir + '/build'
134		if not os.path.exists(tmp_dir):
135			os.mkdir(tmp_dir)
136		if not os.path.isdir(tmp_dir):
137			raise "Temp dir (%s) is not a dir - args backwards?" \
138								% self.tmpdir
139
140		# We label the first build "build" and then subsequent ones
141		# as "build.2", "build.3", etc. Whilst this is a little bit
142		# inconsistent, 99.9% of jobs will only have one build
143		# (that's not done as kernbench, sparse, or buildtest),
144		# so it works out much cleaner. One of life's comprimises.
145		if not results_dir:
146			results_dir = os.path.join(self.resultdir, 'build')
147			i = 2
148			while os.path.exists(results_dir):
149				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
150				i += 1
151		if not os.path.exists(results_dir):
152			os.mkdir(results_dir)
153
154		return (results_dir, tmp_dir)
155
156
157	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
158				kjob = None ):
159		"""Summon a xen object"""
160		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
161		build_dir = 'xen'
162		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
163
164
165	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
166		"""Summon a kernel object"""
167		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
168		build_dir = 'linux'
169		return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave)
170
171
172	def barrier(self, *args):
173		"""Create a barrier object"""
174		return barrier.barrier(*args)
175
176
177	def setup_dep(self, deps):
178		"""Set up the dependencies for this test.
179
180		deps is a list of libraries required for this test.
181		"""
182		for dep in deps:
183			try:
184				os.chdir(self.autodir + '/deps/' + dep)
185				system('./' + dep + '.py')
186			except:
187				error = "setting up dependency " + dep + "\n"
188				raise UnhandledError(error)
189
190
191	def __runtest(self, url, tag, args, dargs):
192		try:
193			test.runtest(self, url, tag, args, dargs)
194		except AutotestError:
195			raise
196		except:
197			raise UnhandledError('running test ' + \
198				self.__class__.__name__ + "\n")
199
200
201	def runtest(self, tag, url, *args):
202		raise "Deprecated call to job.runtest. Use run_test instead"
203
204
205	def run_test(self, url, *args, **dargs):
206		"""Summon a test object and run it.
207
208		tag
209			tag to add to testname
210		url
211			url of the test to run
212		"""
213
214		if not url:
215			raise "Test name is invalid. Switched arguments?"
216		(group, name) = test.testname(url)
217		tag = None
218		if dargs.has_key('tag'):
219			tag = dargs['tag']
220			del dargs['tag']
221			if tag:
222				name += '.' + tag
223		try:
224			try:
225				self.__runtest(url, tag, args, dargs)
226			except Exception, detail:
227				self.record("FAIL " + name + " " + \
228					detail.__str__() + "\n")
229
230				raise
231			else:
232				self.record("GOOD " + name + \
233					" completed successfully\n")
234		except TestError:
235			return 0
236		except:
237			raise
238		else:
239			return 1
240
241
242	def run_group(self, function, *args):
243		"""\
244		function:
245			subroutine to run
246		*args:
247			arguments for the function
248		"""
249
250		name = function.__name__
251		# if tag:
252		#	name += '.' + tag
253		old_record_prefix = self.record_prefix
254		try:
255			try:
256				self.record("START " + name)
257				self.record_prefix += '\t'
258				function(*args)
259				self.record_prefix = old_record_prefix
260				self.record("END %s GOOD" % name)
261			except:
262				self.record_prefix = old_record_prefix
263				self.record("END %s FAIL" % name)
264		# We don't want to raise up an error higher if it's just
265		# a TestError - we want to carry on to other tests. Hence
266		# this outer try/except block.
267		except TestError:
268			pass
269		except:
270			raise TestError(name + ' failed\n' + format_error())
271
272
273	def filesystem(self, device, mountpoint = None, loop_size = 0):
274		if not mountpoint:
275			mountpoint = self.tmpdir
276		return filesystem.filesystem(self, device, mountpoint,loop_size)
277
278
279	def reboot(self, tag='autotest'):
280		self.harness.run_reboot()
281		self.bootloader.boot_once(tag)
282		system("reboot")
283		self.quit()
284
285
286	def noop(self, text):
287		print "job: noop: " + text
288
289
290	# Job control primatives.
291
292	def __parallel_execute(self, func, *args):
293		func(*args)
294
295
296	def parallel(self, *tasklist):
297		"""Run tasks in parallel"""
298
299		pids = []
300		for task in tasklist:
301			pids.append(fork_start(self.resultdir,
302					lambda: self.__parallel_execute(*task)))
303		for pid in pids:
304			fork_waitfor(self.resultdir, pid)
305
306
307	def quit(self):
308		# XXX: should have a better name.
309		self.harness.run_pause()
310		raise JobContinue("more to come")
311
312
313	def complete(self, status):
314		"""Clean up and exit"""
315		# We are about to exit 'complete' so clean up the control file.
316		try:
317			os.unlink(self.control + '.state')
318		except:
319			pass
320		self.harness.run_complete()
321		sys.exit(status)
322
323
324	steps = []
325	def next_step(self, step):
326		"""Define the next step"""
327		step[0] = step[0].__name__
328		self.steps.append(step)
329		pickle.dump(self.steps, open(self.control + '.state', 'w'))
330
331
332	def next_step_prepend(self, step):
333		"""Insert a new step, executing first"""
334		step[0] = step[0].__name__
335		self.steps.insert(0, step)
336		pickle.dump(self.steps, open(self.control + '.state', 'w'))
337
338
339	def step_engine(self):
340		"""the stepping engine -- if the control file defines
341		step_init we will be using this engine to drive multiple runs.
342		"""
343		"""Do the next step"""
344		lcl = dict({'job': self})
345
346		str = """
347from error import *
348from autotest_utils import *
349"""
350		exec(str, lcl, lcl)
351		execfile(self.control, lcl, lcl)
352
353		state = self.control + '.state'
354		# If there is a mid-job state file load that in and continue
355		# where it indicates.  Otherwise start stepping at the passed
356		# entry.
357		try:
358			self.steps = pickle.load(open(state, 'r'))
359		except:
360			if lcl.has_key('step_init'):
361				self.next_step([lcl['step_init']])
362
363		# Run the step list.
364		while len(self.steps) > 0:
365			step = self.steps.pop(0)
366			pickle.dump(self.steps, open(state, 'w'))
367
368			cmd = step.pop(0)
369			cmd = lcl[cmd]
370			lcl['__cmd'] = cmd
371			lcl['__args'] = step
372			exec("__cmd(*__args)", lcl, lcl)
373
374
375	def record(self, msg):
376		"""Record job-level status"""
377
378		msg = msg.rstrip()
379		# Ensure any continuation lines are marked so we can
380		# detect them in the status file to ensure it is parsable.
381		msg = re.sub(r"\n", "\n" + self.record_prefix + "  ", msg)
382		msg = self.record_prefix + msg
383
384		self.harness.test_status(msg)
385		print msg
386		status = self.resultdir + "/status"
387		file(status, "a").write(msg + "\n")
388
389
390def runjob(control, cont = False, tag = "default", harness_type = ''):
391	"""The main interface to this module
392
393	control
394		The control file to use for this job.
395	cont
396		Whether this is the continuation of a previously started job
397	"""
398	control = os.path.abspath(control)
399	state = control + '.state'
400
401	# instantiate the job object ready for the control file.
402	myjob = None
403	try:
404		# Check that the control file is valid
405		if not os.path.exists(control):
406			raise JobError(control + ": control file not found")
407
408		# When continuing, the job is complete when there is no
409		# state file, ensure we don't try and continue.
410		if cont and not os.path.exists(state):
411			sys.exit(1)
412		if cont == False and os.path.exists(state):
413			os.unlink(state)
414
415		myjob = job(control, tag, cont, harness_type)
416
417		# Load in the users control file, may do any one of:
418		#  1) execute in toto
419		#  2) define steps, and select the first via next_step()
420		myjob.step_engine()
421
422	except JobContinue:
423		sys.exit(5)
424
425	except JobError, instance:
426		print "JOB ERROR: " + instance.args[0]
427		if myjob != None:
428			myjob.record("ABORT " + instance.args[0] + "\n")
429			myjob.complete(1)
430
431	except:
432		if myjob:
433			myjob.harness.run_abort()
434		# Ensure we cannot continue this job, it is in rictus.
435		if os.path.exists(state):
436			os.unlink(state)
437		raise
438
439	# If we get here, then we assume the job is complete and good.
440	myjob.complete(0)
441
442