job.py revision 08403caa1b9fd69b2ccb79b0c9fcb6e7e0b1913c
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
14import harness, config
15
16class job:
17	"""The actual job against which we do everything.
18
19	Properties:
20		autodir
21			The top level autotest directory (/usr/local/autotest).
22			Comes from os.environ['AUTODIR'].
23		bindir
24			<autodir>/bin/
25		testdir
26			<autodir>/tests/
27		profdir
28			<autodir>/profilers/
29		tmpdir
30			<autodir>/tmp/
31		resultdir
32			<autodir>/results/<jobtag>
33		stdout
34			fd_stack object for stdout
35		stderr
36			fd_stack object for stderr
37		profilers
38			the profilers object for this job
39		harness
40			the server harness object for this job
41		config
42			the job configuration for this job
43	"""
44
45	def __init__(self, control, jobtag, cont, harness_type=None):
46		"""
47			control
48				The control file (pathname of)
49			jobtag
50				The job tag string (eg "default")
51			cont
52				If this is the continuation of this job
53			harness_type
54				An alternative server harness
55		"""
56		self.autodir = os.environ['AUTODIR']
57		self.bindir = os.path.join(self.autodir, 'bin')
58		self.testdir = os.path.join(self.autodir, 'tests')
59		self.profdir = os.path.join(self.autodir, 'profilers')
60		self.tmpdir = os.path.join(self.autodir, 'tmp')
61		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
62
63		if not cont:
64			if os.path.exists(self.tmpdir):
65				system('umount -f %s > /dev/null 2> /dev/null'%\
66					 	self.tmpdir, ignorestatus=True)
67				system('rm -rf ' + self.tmpdir)
68			os.mkdir(self.tmpdir)
69
70			results = os.path.join(self.autodir, 'results')
71			if not os.path.exists(results):
72				os.mkdir(results)
73
74			download = os.path.join(self.testdir, 'download')
75			if os.path.exists(download):
76				system('rm -rf ' + download)
77			os.mkdir(download)
78
79			if os.path.exists(self.resultdir):
80				system('rm -rf ' + self.resultdir)
81			os.mkdir(self.resultdir)
82
83			os.mkdir(os.path.join(self.resultdir, 'debug'))
84			os.mkdir(os.path.join(self.resultdir, 'analysis'))
85			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
86
87			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
88
89		self.control = control
90		self.jobtab = jobtag
91
92		self.stdout = fd_stack.fd_stack(1, sys.stdout)
93		self.stderr = fd_stack.fd_stack(2, sys.stderr)
94		self.record_prefix = ''
95
96		self.config = config.config(self)
97
98		self.harness = harness.select(harness_type, self)
99
100		self.profilers = profilers.profilers(self)
101
102		try:
103			tool = self.config_get('boottool.executable')
104			self.bootloader = boottool.boottool(tool)
105		except:
106			pass
107
108		pwd = os.getcwd()
109		os.chdir(os.path.join(self.resultdir, 'sysinfo'))
110		system(os.path.join(self.bindir, 'sysinfo.py'))
111		system('dmesg -c > dmesg', ignorestatus=1)
112		os.chdir(pwd)
113
114		self.harness.run_start()
115
116
117	def relative_path(self, path):
118		"""\
119		Return a patch relative to the job results directory
120		"""
121		head = len(self.resultdir) + 1     # remove the / inbetween
122		return path[head:]
123
124
125	def control_get(self):
126		return self.control
127
128
129	def harness_select(self, which):
130		self.harness = harness.select(which, self)
131
132
133	def config_set(self, name, value):
134		self.config.set(name, value)
135
136
137	def config_get(self, name):
138		return self.config.get(name)
139
140	def setup_dirs(self, results_dir, tmp_dir):
141		if not tmp_dir:
142			tmp_dir = os.path.join(self.tmpdir, 'build')
143		if not os.path.exists(tmp_dir):
144			os.mkdir(tmp_dir)
145		if not os.path.isdir(tmp_dir):
146			raise "Temp dir (%s) is not a dir - args backwards?" \
147								% self.tmpdir
148
149		# We label the first build "build" and then subsequent ones
150		# as "build.2", "build.3", etc. Whilst this is a little bit
151		# inconsistent, 99.9% of jobs will only have one build
152		# (that's not done as kernbench, sparse, or buildtest),
153		# so it works out much cleaner. One of life's comprimises.
154		if not results_dir:
155			results_dir = os.path.join(self.resultdir, 'build')
156			i = 2
157			while os.path.exists(results_dir):
158				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
159				i += 1
160		if not os.path.exists(results_dir):
161			os.mkdir(results_dir)
162
163		return (results_dir, tmp_dir)
164
165
166	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
167				kjob = None ):
168		"""Summon a xen object"""
169		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
170		build_dir = 'xen'
171		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
172
173
174	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
175		"""Summon a kernel object"""
176		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
177		build_dir = 'linux'
178		return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave)
179
180
181	def barrier(self, *args):
182		"""Create a barrier object"""
183		return barrier.barrier(*args)
184
185
186	def setup_dep(self, deps):
187		"""Set up the dependencies for this test.
188
189		deps is a list of libraries required for this test.
190		"""
191		for dep in deps:
192			try:
193				os.chdir(os.path.join(self.autodir, 'deps', dep))
194				system('./' + dep + '.py')
195			except:
196				error = "setting up dependency " + dep + "\n"
197				raise UnhandledError(error)
198
199
200	def __runtest(self, url, tag, args, dargs):
201		try:
202			test.runtest(self, url, tag, args, dargs)
203		except AutotestError:
204			raise
205		except:
206			raise UnhandledError('running test ' + \
207				self.__class__.__name__ + "\n")
208
209
210	def runtest(self, tag, url, *args):
211		raise "Deprecated call to job.runtest. Use run_test instead"
212
213
214	def run_test(self, url, *args, **dargs):
215		"""Summon a test object and run it.
216
217		tag
218			tag to add to testname
219		url
220			url of the test to run
221		"""
222
223		if not url:
224			raise "Test name is invalid. Switched arguments?"
225		(group, testname) = test.testname(url)
226		tag = None
227		subdir = testname
228		if dargs.has_key('tag'):
229			tag = dargs['tag']
230			del dargs['tag']
231			if tag:
232				subdir += '.' + tag
233		try:
234			try:
235				self.__runtest(url, tag, args, dargs)
236			except Exception, detail:
237				self.record('FAIL', subdir, testname, \
238							detail.__str__())
239
240				raise
241			else:
242				self.record('GOOD', subdir, testname, \
243						'completed successfully')
244		except TestError:
245			return 0
246		except:
247			raise
248		else:
249			return 1
250
251
252	def run_group(self, function, *args):
253		"""\
254		function:
255			subroutine to run
256		*args:
257			arguments for the function
258		"""
259
260		result = None
261		name = function.__name__
262		# if tag:
263		#	name += '.' + tag
264		old_record_prefix = self.record_prefix
265		try:
266			try:
267				self.record('START', None, name)
268				self.record_prefix += '\t'
269				result = function(*args)
270				self.record_prefix = old_record_prefix
271				self.record('END GOOD', None, name)
272			except:
273				self.record_prefix = old_record_prefix
274				self.record('END FAIL', None, name, format_error())
275		# We don't want to raise up an error higher if it's just
276		# a TestError - we want to carry on to other tests. Hence
277		# this outer try/except block.
278		except TestError:
279			pass
280		except:
281			raise TestError(name + ' failed\n' + format_error())
282
283		return result
284
285
286	# Check the passed kernel identifier against the command line
287	# and the running kernel, abort the job on missmatch.
288	def kernel_check_ident(self, expected_when, expected_id, subdir):
289		print "POST BOOT: checking booted kernel mark=%d identity='%s'" \
290			% (expected_when, expected_id)
291
292		running_id = running_os_ident()
293
294		cmdline = read_one_line("/proc/cmdline")
295
296		find_sum = re.compile(r'.*IDENT=(\d+)')
297		m = find_sum.match(cmdline)
298		cmdline_when = -1
299		if m:
300			cmdline_when = int(m.groups()[0])
301
302		# We have all the facts, see if they indicate we
303		# booted the requested kernel or not.
304		bad = False
305		if expected_id != running_id:
306			print "check_kernel_ident: kernel identifier mismatch"
307			bad = True
308		if expected_when != cmdline_when:
309			print "check_kernel_ident: kernel command line mismatch"
310			bad = True
311
312		if bad:
313			print "   Expected Ident: " + expected_id
314			print "    Running Ident: " + running_id
315			print "    Expected Mark: %d" % (expected_when)
316			print "Command Line Mark: %d" % (cmdline_when)
317			print "     Command Line: " + cmdline
318
319			raise JobError("boot failure")
320
321		self.record('GOOD', subdir, 'boot', 'boot successful')
322
323
324	def filesystem(self, device, mountpoint = None, loop_size = 0):
325		if not mountpoint:
326			mountpoint = self.tmpdir
327		return filesystem.filesystem(self, device, mountpoint,loop_size)
328
329
330	def reboot(self, tag='autotest'):
331		self.harness.run_reboot()
332		self.bootloader.boot_once(tag)
333		system("reboot")
334		self.quit()
335
336
337	def noop(self, text):
338		print "job: noop: " + text
339
340
341	# Job control primatives.
342
343	def __parallel_execute(self, func, *args):
344		func(*args)
345
346
347	def parallel(self, *tasklist):
348		"""Run tasks in parallel"""
349
350		pids = []
351		for task in tasklist:
352			pids.append(fork_start(self.resultdir,
353					lambda: self.__parallel_execute(*task)))
354		for pid in pids:
355			fork_waitfor(self.resultdir, pid)
356
357
358	def quit(self):
359		# XXX: should have a better name.
360		self.harness.run_pause()
361		raise JobContinue("more to come")
362
363
364	def complete(self, status):
365		"""Clean up and exit"""
366		# We are about to exit 'complete' so clean up the control file.
367		try:
368			os.unlink(self.control + '.state')
369		except:
370			pass
371		self.harness.run_complete()
372		sys.exit(status)
373
374
375	steps = []
376	def next_step(self, step):
377		"""Define the next step"""
378		if not isinstance(step[0], basestring):
379			step[0] = step[0].__name__
380		self.steps.append(step)
381		pickle.dump(self.steps, open(self.control + '.state', 'w'))
382
383
384	def next_step_prepend(self, step):
385		"""Insert a new step, executing first"""
386		if not isinstance(step[0], basestring):
387			step[0] = step[0].__name__
388		self.steps.insert(0, step)
389		pickle.dump(self.steps, open(self.control + '.state', 'w'))
390
391
392	def step_engine(self):
393		"""the stepping engine -- if the control file defines
394		step_init we will be using this engine to drive multiple runs.
395		"""
396		"""Do the next step"""
397		lcl = dict({'job': self})
398
399		str = """
400from error import *
401from autotest_utils import *
402"""
403		exec(str, lcl, lcl)
404		execfile(self.control, lcl, lcl)
405
406		state = self.control + '.state'
407		# If there is a mid-job state file load that in and continue
408		# where it indicates.  Otherwise start stepping at the passed
409		# entry.
410		try:
411			self.steps = pickle.load(open(state, 'r'))
412		except:
413			if lcl.has_key('step_init'):
414				self.next_step([lcl['step_init']])
415
416		# Run the step list.
417		while len(self.steps) > 0:
418			step = self.steps.pop(0)
419			pickle.dump(self.steps, open(state, 'w'))
420
421			cmd = step.pop(0)
422			lcl['__args'] = step
423			exec(cmd + "(*__args)", lcl, lcl)
424
425
426	def record(self, status_code, subdir, operation, status = ''):
427		"""
428		Record job-level status
429
430		The intent is to make this file both machine parseable and
431		human readable. That involves a little more complexity, but
432		really isn't all that bad ;-)
433
434		Format is <status code>\t<subdir>\t<operation>\t<status>
435
436		status code: (GOOD|WARN|FAIL|ABORT)
437			or   START
438			or   END (GOOD|WARN|FAIL|ABORT)
439
440		subdir: MUST be a relevant subdirectory in the results,
441		or None, which will be represented as '----'
442
443		operation: description of what you ran (e.g. "dbench", or
444						"mkfs -t foobar /dev/sda9")
445
446		status: error message or "completed sucessfully"
447
448		------------------------------------------------------------
449
450		Initial tabs indicate indent levels for grouping, and is
451		governed by self.record_prefix
452
453		multiline messages have secondary lines prefaced by a double
454		space ('  ')
455		"""
456
457		if subdir:
458			if re.match(r'[\n\t]', subdir):
459				raise "Invalid character in subdir string"
460			substr = subdir
461		else:
462			substr = '----'
463
464		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
465								status_code):
466			raise "Invalid status code supplied: %s" % status_code
467		if re.match(r'[\n\t]', operation):
468			raise "Invalid character in operation string"
469		operation = operation.rstrip()
470		status = status.rstrip()
471		status = re.sub(r"\t", "  ", status)
472		# Ensure any continuation lines are marked so we can
473		# detect them in the status file to ensure it is parsable.
474		status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
475
476		msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status)
477
478		self.harness.test_status_detail(status_code, substr,
479							operation, status)
480		self.harness.test_status(msg)
481		print msg
482		status_file = os.path.join(self.resultdir, 'status')
483		open(status_file, "a").write(self.record_prefix + msg + "\n")
484		if subdir:
485			status_file = os.path.join(self.resultdir, subdir, 'status')
486			open(status_file, "a").write(msg + "\n")
487
488
489def runjob(control, cont = False, tag = "default", harness_type = ''):
490	"""The main interface to this module
491
492	control
493		The control file to use for this job.
494	cont
495		Whether this is the continuation of a previously started job
496	"""
497	control = os.path.abspath(control)
498	state = control + '.state'
499
500	# instantiate the job object ready for the control file.
501	myjob = None
502	try:
503		# Check that the control file is valid
504		if not os.path.exists(control):
505			raise JobError(control + ": control file not found")
506
507		# When continuing, the job is complete when there is no
508		# state file, ensure we don't try and continue.
509		if cont and not os.path.exists(state):
510			sys.exit(1)
511		if cont == False and os.path.exists(state):
512			os.unlink(state)
513
514		myjob = job(control, tag, cont, harness_type)
515
516		# Load in the users control file, may do any one of:
517		#  1) execute in toto
518		#  2) define steps, and select the first via next_step()
519		myjob.step_engine()
520
521	except JobContinue:
522		sys.exit(5)
523
524	except JobError, instance:
525		print "JOB ERROR: " + instance.args[0]
526		if myjob != None:
527			myjob.record('ABORT', None, instance.args[0])
528			myjob.complete(1)
529
530	except:
531		if myjob:
532			myjob.harness.run_abort()
533		# Ensure we cannot continue this job, it is in rictus.
534		if os.path.exists(state):
535			os.unlink(state)
536		raise
537
538	# If we get here, then we assume the job is complete and good.
539	myjob.complete(0)
540
541