job.py revision c82f24696c6acb3efe618ce63c7de73185af0e14
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
14import harness, config
15
16class job:
17	"""The actual job against which we do everything.
18
19	Properties:
20		autodir
21			The top level autotest directory (/usr/local/autotest).
22			Comes from os.environ['AUTODIR'].
23		bindir
24			<autodir>/bin/
25		testdir
26			<autodir>/tests/
27		profdir
28			<autodir>/profilers/
29		tmpdir
30			<autodir>/tmp/
31		resultdir
32			<autodir>/results/<jobtag>
33		stdout
34			fd_stack object for stdout
35		stderr
36			fd_stack object for stderr
37		profilers
38			the profilers object for this job
39		harness
40			the server harness object for this job
41		config
42			the job configuration for this job
43	"""
44
45	def __init__(self, control, jobtag, cont, harness_type=None):
46		"""
47			control
48				The control file (pathname of)
49			jobtag
50				The job tag string (eg "default")
51			cont
52				If this is the continuation of this job
53			harness_type
54				An alternative server harness
55		"""
56		self.autodir = os.environ['AUTODIR']
57		self.bindir = os.path.join(self.autodir, 'bin')
58		self.testdir = os.path.join(self.autodir, 'tests')
59		self.profdir = os.path.join(self.autodir, 'profilers')
60		self.tmpdir = os.path.join(self.autodir, 'tmp')
61		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
62
63		if not cont:
64			if os.path.exists(self.tmpdir):
65				system('umount -f %s > /dev/null 2> /dev/null'%\
66					 	self.tmpdir, ignorestatus=True)
67				system('rm -rf ' + self.tmpdir)
68			os.mkdir(self.tmpdir)
69
70			results = os.path.join(self.autodir, 'results')
71			if not os.path.exists(results):
72				os.mkdir(results)
73
74			download = os.path.join(self.testdir, 'download')
75			if os.path.exists(download):
76				system('rm -rf ' + download)
77			os.mkdir(download)
78
79			if os.path.exists(self.resultdir):
80				system('rm -rf ' + self.resultdir)
81			os.mkdir(self.resultdir)
82
83			os.mkdir(os.path.join(self.resultdir, 'debug'))
84			os.mkdir(os.path.join(self.resultdir, 'analysis'))
85			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
86
87			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
88
89		self.control = control
90		self.jobtab = jobtag
91
92		self.stdout = fd_stack.fd_stack(1, sys.stdout)
93		self.stderr = fd_stack.fd_stack(2, sys.stderr)
94		self.record_prefix = ''
95
96		self.config = config.config(self)
97
98		self.harness = harness.select(harness_type, self)
99
100		self.profilers = profilers.profilers(self)
101
102		try:
103			tool = self.config_get('boottool.executable')
104			self.bootloader = boottool.boottool(tool)
105		except:
106			pass
107
108		pwd = os.getcwd()
109		os.chdir(os.path.join(self.resultdir, 'sysinfo'))
110		system(os.path.join(self.bindir, 'sysinfo.py'))
111		system('dmesg -c > dmesg', ignorestatus=1)
112		os.chdir(pwd)
113
114		self.harness.run_start()
115
116
117	def relative_path(self, path):
118		"""\
119		Return a patch relative to the job results directory
120		"""
121		head = len(self.resultdir) + 1     # remove the / inbetween
122		return path[head:]
123
124
125	def control_get(self):
126		return self.control
127
128
129	def harness_select(self, which):
130		self.harness = harness.select(which, self)
131
132
133	def config_set(self, name, value):
134		self.config.set(name, value)
135
136
137	def config_get(self, name):
138		return self.config.get(name)
139
140	def setup_dirs(self, results_dir, tmp_dir):
141		if not tmp_dir:
142			tmp_dir = os.path.join(self.tmpdir, 'build')
143		if not os.path.exists(tmp_dir):
144			os.mkdir(tmp_dir)
145		if not os.path.isdir(tmp_dir):
146			raise "Temp dir (%s) is not a dir - args backwards?" \
147								% self.tmpdir
148
149		# We label the first build "build" and then subsequent ones
150		# as "build.2", "build.3", etc. Whilst this is a little bit
151		# inconsistent, 99.9% of jobs will only have one build
152		# (that's not done as kernbench, sparse, or buildtest),
153		# so it works out much cleaner. One of life's comprimises.
154		if not results_dir:
155			results_dir = os.path.join(self.resultdir, 'build')
156			i = 2
157			while os.path.exists(results_dir):
158				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
159				i += 1
160		if not os.path.exists(results_dir):
161			os.mkdir(results_dir)
162
163		return (results_dir, tmp_dir)
164
165
166	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
167				kjob = None ):
168		"""Summon a xen object"""
169		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
170		build_dir = 'xen'
171		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
172
173
174	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
175		"""Summon a kernel object"""
176		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
177		build_dir = 'linux'
178		return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave)
179
180
181	def barrier(self, *args):
182		"""Create a barrier object"""
183		return barrier.barrier(*args)
184
185
186	def setup_dep(self, deps):
187		"""Set up the dependencies for this test.
188
189		deps is a list of libraries required for this test.
190		"""
191		for dep in deps:
192			try:
193				os.chdir(os.path.join(self.autodir, 'deps', dep))
194				system('./' + dep + '.py')
195			except:
196				error = "setting up dependency " + dep + "\n"
197				raise UnhandledError(error)
198
199
200	def __runtest(self, url, tag, args, dargs):
201		try:
202			test.runtest(self, url, tag, args, dargs)
203		except AutotestError:
204			raise
205		except:
206			raise UnhandledError('running test ' + \
207				self.__class__.__name__ + "\n")
208
209
210	def runtest(self, tag, url, *args):
211		raise "Deprecated call to job.runtest. Use run_test instead"
212
213
214	def run_test(self, url, *args, **dargs):
215		"""Summon a test object and run it.
216
217		tag
218			tag to add to testname
219		url
220			url of the test to run
221		"""
222
223		if not url:
224			raise "Test name is invalid. Switched arguments?"
225		(group, testname) = test.testname(url)
226		tag = None
227		subdir = testname
228		if dargs.has_key('tag'):
229			tag = dargs['tag']
230			del dargs['tag']
231			if tag:
232				subdir += '.' + tag
233
234		reraise = False
235		if dargs.has_key('reraise'):
236			reraise = dargs['reraise']
237			del dargs['reraise']
238
239		try:
240			try:
241				self.__runtest(url, tag, args, dargs)
242			except Exception, detail:
243				self.record('FAIL', subdir, testname, \
244							detail.__str__())
245
246				raise
247			else:
248				self.record('GOOD', subdir, testname, \
249						'completed successfully')
250		except TestError:
251			if reraise:
252				raise
253			else:
254				return 0
255		except:
256			raise
257		else:
258			return 1
259
260
261	def run_group(self, function, *args, **dargs):
262		"""\
263		function:
264			subroutine to run
265		*args:
266			arguments for the function
267		"""
268
269		result = None
270		name = function.__name__
271
272		# Allow the tag for the group to be specified.
273		if dargs.has_key('tag'):
274			tag = dargs['tag']
275			del dargs['tag']
276			if tag:
277				name = tag
278
279		# if tag:
280		#	name += '.' + tag
281		old_record_prefix = self.record_prefix
282		try:
283			try:
284				self.record('START', None, name)
285				self.record_prefix += '\t'
286				result = function(*args, **dargs)
287				self.record_prefix = old_record_prefix
288				self.record('END GOOD', None, name)
289			except:
290				self.record_prefix = old_record_prefix
291				self.record('END FAIL', None, name, format_error())
292		# We don't want to raise up an error higher if it's just
293		# a TestError - we want to carry on to other tests. Hence
294		# this outer try/except block.
295		except TestError:
296			pass
297		except:
298			raise TestError(name + ' failed\n' + format_error())
299
300		return result
301
302
303	# Check the passed kernel identifier against the command line
304	# and the running kernel, abort the job on missmatch.
305	def kernel_check_ident(self, expected_when, expected_id, subdir):
306		print "POST BOOT: checking booted kernel mark=%d identity='%s'" \
307			% (expected_when, expected_id)
308
309		running_id = running_os_ident()
310
311		cmdline = read_one_line("/proc/cmdline")
312
313		find_sum = re.compile(r'.*IDENT=(\d+)')
314		m = find_sum.match(cmdline)
315		cmdline_when = -1
316		if m:
317			cmdline_when = int(m.groups()[0])
318
319		# We have all the facts, see if they indicate we
320		# booted the requested kernel or not.
321		bad = False
322		if expected_id != running_id:
323			print "check_kernel_ident: kernel identifier mismatch"
324			bad = True
325		if expected_when != cmdline_when:
326			print "check_kernel_ident: kernel command line mismatch"
327			bad = True
328
329		if bad:
330			print "   Expected Ident: " + expected_id
331			print "    Running Ident: " + running_id
332			print "    Expected Mark: %d" % (expected_when)
333			print "Command Line Mark: %d" % (cmdline_when)
334			print "     Command Line: " + cmdline
335
336			raise JobError("boot failure")
337
338		self.record('GOOD', subdir, 'boot', 'boot successful')
339
340
341	def filesystem(self, device, mountpoint = None, loop_size = 0):
342		if not mountpoint:
343			mountpoint = self.tmpdir
344		return filesystem.filesystem(self, device, mountpoint,loop_size)
345
346
347	def reboot(self, tag='autotest'):
348		self.harness.run_reboot()
349		self.bootloader.boot_once(tag)
350		system("reboot")
351		self.quit()
352
353
354	def noop(self, text):
355		print "job: noop: " + text
356
357
358	# Job control primatives.
359
360	def __parallel_execute(self, func, *args):
361		func(*args)
362
363
364	def parallel(self, *tasklist):
365		"""Run tasks in parallel"""
366
367		pids = []
368		for task in tasklist:
369			pids.append(fork_start(self.resultdir,
370					lambda: self.__parallel_execute(*task)))
371		for pid in pids:
372			fork_waitfor(self.resultdir, pid)
373
374
375	def quit(self):
376		# XXX: should have a better name.
377		self.harness.run_pause()
378		raise JobContinue("more to come")
379
380
381	def complete(self, status):
382		"""Clean up and exit"""
383		# We are about to exit 'complete' so clean up the control file.
384		try:
385			os.unlink(self.control + '.state')
386		except:
387			pass
388		self.harness.run_complete()
389		sys.exit(status)
390
391
392	steps = []
393	def next_step(self, step):
394		"""Define the next step"""
395		if not isinstance(step[0], basestring):
396			step[0] = step[0].__name__
397		self.steps.append(step)
398		pickle.dump(self.steps, open(self.control + '.state', 'w'))
399
400
401	def next_step_prepend(self, step):
402		"""Insert a new step, executing first"""
403		if not isinstance(step[0], basestring):
404			step[0] = step[0].__name__
405		self.steps.insert(0, step)
406		pickle.dump(self.steps, open(self.control + '.state', 'w'))
407
408
409	def step_engine(self):
410		"""the stepping engine -- if the control file defines
411		step_init we will be using this engine to drive multiple runs.
412		"""
413		"""Do the next step"""
414		lcl = dict({'job': self})
415
416		str = """
417from error import *
418from autotest_utils import *
419"""
420		exec(str, lcl, lcl)
421		execfile(self.control, lcl, lcl)
422
423		state = self.control + '.state'
424		# If there is a mid-job state file load that in and continue
425		# where it indicates.  Otherwise start stepping at the passed
426		# entry.
427		try:
428			self.steps = pickle.load(open(state, 'r'))
429		except:
430			if lcl.has_key('step_init'):
431				self.next_step([lcl['step_init']])
432
433		# Run the step list.
434		while len(self.steps) > 0:
435			step = self.steps.pop(0)
436			pickle.dump(self.steps, open(state, 'w'))
437
438			cmd = step.pop(0)
439			lcl['__args'] = step
440			exec(cmd + "(*__args)", lcl, lcl)
441
442
443	def record(self, status_code, subdir, operation, status = ''):
444		"""
445		Record job-level status
446
447		The intent is to make this file both machine parseable and
448		human readable. That involves a little more complexity, but
449		really isn't all that bad ;-)
450
451		Format is <status code>\t<subdir>\t<operation>\t<status>
452
453		status code: (GOOD|WARN|FAIL|ABORT)
454			or   START
455			or   END (GOOD|WARN|FAIL|ABORT)
456
457		subdir: MUST be a relevant subdirectory in the results,
458		or None, which will be represented as '----'
459
460		operation: description of what you ran (e.g. "dbench", or
461						"mkfs -t foobar /dev/sda9")
462
463		status: error message or "completed sucessfully"
464
465		------------------------------------------------------------
466
467		Initial tabs indicate indent levels for grouping, and is
468		governed by self.record_prefix
469
470		multiline messages have secondary lines prefaced by a double
471		space ('  ')
472		"""
473
474		if subdir:
475			if re.match(r'[\n\t]', subdir):
476				raise "Invalid character in subdir string"
477			substr = subdir
478		else:
479			substr = '----'
480
481		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
482								status_code):
483			raise "Invalid status code supplied: %s" % status_code
484		if re.match(r'[\n\t]', operation):
485			raise "Invalid character in operation string"
486		operation = operation.rstrip()
487		status = status.rstrip()
488		status = re.sub(r"\t", "  ", status)
489		# Ensure any continuation lines are marked so we can
490		# detect them in the status file to ensure it is parsable.
491		status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
492
493		msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status)
494
495		self.harness.test_status_detail(status_code, substr,
496							operation, status)
497		self.harness.test_status(msg)
498		print msg
499		status_file = os.path.join(self.resultdir, 'status')
500		open(status_file, "a").write(self.record_prefix + msg + "\n")
501		if subdir:
502			status_file = os.path.join(self.resultdir, subdir, 'status')
503			open(status_file, "a").write(msg + "\n")
504
505
506def runjob(control, cont = False, tag = "default", harness_type = ''):
507	"""The main interface to this module
508
509	control
510		The control file to use for this job.
511	cont
512		Whether this is the continuation of a previously started job
513	"""
514	control = os.path.abspath(control)
515	state = control + '.state'
516
517	# instantiate the job object ready for the control file.
518	myjob = None
519	try:
520		# Check that the control file is valid
521		if not os.path.exists(control):
522			raise JobError(control + ": control file not found")
523
524		# When continuing, the job is complete when there is no
525		# state file, ensure we don't try and continue.
526		if cont and not os.path.exists(state):
527			sys.exit(1)
528		if cont == False and os.path.exists(state):
529			os.unlink(state)
530
531		myjob = job(control, tag, cont, harness_type)
532
533		# Load in the users control file, may do any one of:
534		#  1) execute in toto
535		#  2) define steps, and select the first via next_step()
536		myjob.step_engine()
537
538	except JobContinue:
539		sys.exit(5)
540
541	except JobError, instance:
542		print "JOB ERROR: " + instance.args[0]
543		if myjob != None:
544			myjob.record('ABORT', None, instance.args[0])
545			myjob.complete(1)
546
547	except:
548		if myjob:
549			myjob.harness.run_abort()
550		# Ensure we cannot continue this job, it is in rictus.
551		if os.path.exists(state):
552			os.unlink(state)
553		raise
554
555	# If we get here, then we assume the job is complete and good.
556	myjob.complete(0)
557
558