job.py revision 736adc930ee15e5bade91e2cf2c996ccf8bbd3eb
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
14import harness, config
15
16class job:
17	"""The actual job against which we do everything.
18
19	Properties:
20		autodir
21			The top level autotest directory (/usr/local/autotest).
22			Comes from os.environ['AUTODIR'].
23		bindir
24			<autodir>/bin/
25		testdir
26			<autodir>/tests/
27		profdir
28			<autodir>/profilers/
29		tmpdir
30			<autodir>/tmp/
31		resultdir
32			<autodir>/results/<jobtag>
33		stdout
34			fd_stack object for stdout
35		stderr
36			fd_stack object for stderr
37		profilers
38			the profilers object for this job
39		harness
40			the server harness object for this job
41		config
42			the job configuration for this job
43	"""
44
45	def __init__(self, control, jobtag, cont, harness_type=None):
46		"""
47			control
48				The control file (pathname of)
49			jobtag
50				The job tag string (eg "default")
51			cont
52				If this is the continuation of this job
53			harness_type
54				An alternative server harness
55		"""
56		self.autodir = os.environ['AUTODIR']
57		self.bindir = os.path.join(self.autodir, 'bin')
58		self.testdir = os.path.join(self.autodir, 'tests')
59		self.profdir = os.path.join(self.autodir, 'profilers')
60		self.tmpdir = os.path.join(self.autodir, 'tmp')
61		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
62
63		if not cont:
64			if os.path.exists(self.tmpdir):
65				system('umount -f %s > /dev/null 2> /dev/null'%\
66					 	self.tmpdir, ignorestatus=True)
67				system('rm -rf ' + self.tmpdir)
68			os.mkdir(self.tmpdir)
69
70			results = os.path.join(self.autodir, 'results')
71			if not os.path.exists(results):
72				os.mkdir(results)
73
74			download = os.path.join(self.testdir, 'download')
75			if os.path.exists(download):
76				system('rm -rf ' + download)
77			os.mkdir(download)
78
79			if os.path.exists(self.resultdir):
80				system('rm -rf ' + self.resultdir)
81			os.mkdir(self.resultdir)
82
83			os.mkdir(os.path.join(self.resultdir, 'debug'))
84			os.mkdir(os.path.join(self.resultdir, 'analysis'))
85			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
86
87			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
88
89		self.control = control
90		self.jobtab = jobtag
91
92		self.stdout = fd_stack.fd_stack(1, sys.stdout)
93		self.stderr = fd_stack.fd_stack(2, sys.stderr)
94		self.record_prefix = ''
95
96		self.config = config.config(self)
97
98		self.harness = harness.select(harness_type, self)
99
100		self.profilers = profilers.profilers(self)
101
102		try:
103			tool = self.config_get('boottool.executable')
104			self.bootloader = boottool.boottool(tool)
105		except:
106			pass
107
108		pwd = os.getcwd()
109		os.chdir(os.path.join(self.resultdir, 'sysinfo'))
110		system(os.path.join(self.bindir, 'sysinfo.py'))
111		system('dmesg -c > dmesg', ignorestatus=1)
112		os.chdir(pwd)
113
114		self.harness.run_start()
115
116
117	def relative_path(self, path):
118		"""\
119		Return a patch relative to the job results directory
120		"""
121		head = len(self.resultdir) + 1     # remove the / inbetween
122		return path[head:]
123
124
125	def control_get(self):
126		return self.control
127
128
129	def harness_select(self, which):
130		self.harness = harness.select(which, self)
131
132
133	def config_set(self, name, value):
134		self.config.set(name, value)
135
136
137	def config_get(self, name):
138		return self.config.get(name)
139
140	def setup_dirs(self, results_dir, tmp_dir):
141		if not tmp_dir:
142			tmp_dir = os.path.join(self.tmpdir, 'build')
143		if not os.path.exists(tmp_dir):
144			os.mkdir(tmp_dir)
145		if not os.path.isdir(tmp_dir):
146			raise "Temp dir (%s) is not a dir - args backwards?" \
147								% self.tmpdir
148
149		# We label the first build "build" and then subsequent ones
150		# as "build.2", "build.3", etc. Whilst this is a little bit
151		# inconsistent, 99.9% of jobs will only have one build
152		# (that's not done as kernbench, sparse, or buildtest),
153		# so it works out much cleaner. One of life's comprimises.
154		if not results_dir:
155			results_dir = os.path.join(self.resultdir, 'build')
156			i = 2
157			while os.path.exists(results_dir):
158				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
159				i += 1
160		if not os.path.exists(results_dir):
161			os.mkdir(results_dir)
162
163		return (results_dir, tmp_dir)
164
165
166	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
167				kjob = None ):
168		"""Summon a xen object"""
169		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
170		build_dir = 'xen'
171		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
172
173
174	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
175		"""Summon a kernel object"""
176		if base_tree.endswith('.rpm'):
177			return kernel.rpm_kernel(self, base_tree, results_dir)
178		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
179		build_dir = 'linux'
180		return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave)
181
182
183	def barrier(self, *args):
184		"""Create a barrier object"""
185		return barrier.barrier(*args)
186
187
188	def setup_dep(self, deps):
189		"""Set up the dependencies for this test.
190
191		deps is a list of libraries required for this test.
192		"""
193		for dep in deps:
194			try:
195				os.chdir(os.path.join(self.autodir, 'deps', dep))
196				system('./' + dep + '.py')
197			except:
198				error = "setting up dependency " + dep + "\n"
199				raise UnhandledError(error)
200
201
202	def __runtest(self, url, tag, args, dargs):
203		try:
204			test.runtest(self, url, tag, args, dargs)
205		except AutotestError:
206			raise
207		except:
208			raise UnhandledError('running test ' + \
209				self.__class__.__name__ + "\n")
210
211
212	def runtest(self, tag, url, *args):
213		raise "Deprecated call to job.runtest. Use run_test instead"
214
215
216	def run_test(self, url, *args, **dargs):
217		"""Summon a test object and run it.
218
219		tag
220			tag to add to testname
221		url
222			url of the test to run
223		"""
224
225		if not url:
226			raise "Test name is invalid. Switched arguments?"
227		(group, testname) = test.testname(url)
228		tag = None
229		subdir = testname
230		if dargs.has_key('tag'):
231			tag = dargs['tag']
232			del dargs['tag']
233			if tag:
234				subdir += '.' + tag
235		try:
236			try:
237				self.__runtest(url, tag, args, dargs)
238			except Exception, detail:
239				self.record('FAIL', subdir, testname, \
240							detail.__str__())
241
242				raise
243			else:
244				self.record('GOOD', subdir, testname, \
245						'completed successfully')
246		except TestError:
247			return 0
248		except:
249			raise
250		else:
251			return 1
252
253
254	def run_group(self, function, *args, **dargs):
255		"""\
256		function:
257			subroutine to run
258		*args:
259			arguments for the function
260		"""
261
262		result = None
263		name = function.__name__
264
265		# Allow the tag for the group to be specified.
266		if dargs.has_key('tag'):
267			tag = dargs['tag']
268			del dargs['tag']
269			if tag:
270				name = tag
271
272		# if tag:
273		#	name += '.' + tag
274		old_record_prefix = self.record_prefix
275		try:
276			try:
277				self.record('START', None, name)
278				self.record_prefix += '\t'
279				result = function(*args, **dargs)
280				self.record_prefix = old_record_prefix
281				self.record('END GOOD', None, name)
282			except:
283				self.record_prefix = old_record_prefix
284				self.record('END FAIL', None, name, format_error())
285		# We don't want to raise up an error higher if it's just
286		# a TestError - we want to carry on to other tests. Hence
287		# this outer try/except block.
288		except TestError:
289			pass
290		except:
291			raise TestError(name + ' failed\n' + format_error())
292
293		return result
294
295
296	# Check the passed kernel identifier against the command line
297	# and the running kernel, abort the job on missmatch.
298	def kernel_check_ident(self, expected_when, expected_id, subdir):
299		print "POST BOOT: checking booted kernel mark=%d identity='%s'" \
300			% (expected_when, expected_id)
301
302		running_id = running_os_ident()
303
304		cmdline = read_one_line("/proc/cmdline")
305
306		find_sum = re.compile(r'.*IDENT=(\d+)')
307		m = find_sum.match(cmdline)
308		cmdline_when = -1
309		if m:
310			cmdline_when = int(m.groups()[0])
311
312		# We have all the facts, see if they indicate we
313		# booted the requested kernel or not.
314		bad = False
315		if expected_id != running_id:
316			print "check_kernel_ident: kernel identifier mismatch"
317			bad = True
318		if expected_when != cmdline_when:
319			print "check_kernel_ident: kernel command line mismatch"
320			bad = True
321
322		if bad:
323			print "   Expected Ident: " + expected_id
324			print "    Running Ident: " + running_id
325			print "    Expected Mark: %d" % (expected_when)
326			print "Command Line Mark: %d" % (cmdline_when)
327			print "     Command Line: " + cmdline
328
329			raise JobError("boot failure")
330
331		self.record('GOOD', subdir, 'boot', 'boot successful')
332
333
334	def filesystem(self, device, mountpoint = None, loop_size = 0):
335		if not mountpoint:
336			mountpoint = self.tmpdir
337		return filesystem.filesystem(self, device, mountpoint,loop_size)
338
339
340	def reboot(self, tag='autotest'):
341		self.harness.run_reboot()
342		default = self.config_get('boot.set_default')
343		if default:
344			self.bootloader.set_default(tag)
345		else:
346			self.bootloader.boot_once(tag)
347		system("reboot")
348		self.quit()
349
350
351	def noop(self, text):
352		print "job: noop: " + text
353
354
355	# Job control primatives.
356
357	def __parallel_execute(self, func, *args):
358		func(*args)
359
360
361	def parallel(self, *tasklist):
362		"""Run tasks in parallel"""
363
364		pids = []
365		for task in tasklist:
366			pids.append(fork_start(self.resultdir,
367					lambda: self.__parallel_execute(*task)))
368		for pid in pids:
369			fork_waitfor(self.resultdir, pid)
370
371
372	def quit(self):
373		# XXX: should have a better name.
374		self.harness.run_pause()
375		raise JobContinue("more to come")
376
377
378	def complete(self, status):
379		"""Clean up and exit"""
380		# We are about to exit 'complete' so clean up the control file.
381		try:
382			os.unlink(self.control + '.state')
383		except:
384			pass
385		self.harness.run_complete()
386		sys.exit(status)
387
388
389	steps = []
390	def next_step(self, step):
391		"""Define the next step"""
392		if not isinstance(step[0], basestring):
393			step[0] = step[0].__name__
394		self.steps.append(step)
395		pickle.dump(self.steps, open(self.control + '.state', 'w'))
396
397
398	def next_step_prepend(self, step):
399		"""Insert a new step, executing first"""
400		if not isinstance(step[0], basestring):
401			step[0] = step[0].__name__
402		self.steps.insert(0, step)
403		pickle.dump(self.steps, open(self.control + '.state', 'w'))
404
405
406	def step_engine(self):
407		"""the stepping engine -- if the control file defines
408		step_init we will be using this engine to drive multiple runs.
409		"""
410		"""Do the next step"""
411		lcl = dict({'job': self})
412
413		str = """
414from error import *
415from autotest_utils import *
416"""
417		exec(str, lcl, lcl)
418		execfile(self.control, lcl, lcl)
419
420		state = self.control + '.state'
421		# If there is a mid-job state file load that in and continue
422		# where it indicates.  Otherwise start stepping at the passed
423		# entry.
424		try:
425			self.steps = pickle.load(open(state, 'r'))
426		except:
427			if lcl.has_key('step_init'):
428				self.next_step([lcl['step_init']])
429
430		# Run the step list.
431		while len(self.steps) > 0:
432			step = self.steps.pop(0)
433			pickle.dump(self.steps, open(state, 'w'))
434
435			cmd = step.pop(0)
436			lcl['__args'] = step
437			exec(cmd + "(*__args)", lcl, lcl)
438
439
440	def record(self, status_code, subdir, operation, status = ''):
441		"""
442		Record job-level status
443
444		The intent is to make this file both machine parseable and
445		human readable. That involves a little more complexity, but
446		really isn't all that bad ;-)
447
448		Format is <status code>\t<subdir>\t<operation>\t<status>
449
450		status code: (GOOD|WARN|FAIL|ABORT)
451			or   START
452			or   END (GOOD|WARN|FAIL|ABORT)
453
454		subdir: MUST be a relevant subdirectory in the results,
455		or None, which will be represented as '----'
456
457		operation: description of what you ran (e.g. "dbench", or
458						"mkfs -t foobar /dev/sda9")
459
460		status: error message or "completed sucessfully"
461
462		------------------------------------------------------------
463
464		Initial tabs indicate indent levels for grouping, and is
465		governed by self.record_prefix
466
467		multiline messages have secondary lines prefaced by a double
468		space ('  ')
469		"""
470
471		if subdir:
472			if re.match(r'[\n\t]', subdir):
473				raise "Invalid character in subdir string"
474			substr = subdir
475		else:
476			substr = '----'
477
478		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
479								status_code):
480			raise "Invalid status code supplied: %s" % status_code
481		if re.match(r'[\n\t]', operation):
482			raise "Invalid character in operation string"
483		operation = operation.rstrip()
484		status = status.rstrip()
485		status = re.sub(r"\t", "  ", status)
486		# Ensure any continuation lines are marked so we can
487		# detect them in the status file to ensure it is parsable.
488		status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
489
490		msg = '%s\t%s\t%s\t%s' %(status_code, substr, operation, status)
491
492		self.harness.test_status_detail(status_code, substr,
493							operation, status)
494		self.harness.test_status(msg)
495		print msg
496		status_file = os.path.join(self.resultdir, 'status')
497		open(status_file, "a").write(self.record_prefix + msg + "\n")
498		if subdir:
499			status_file = os.path.join(self.resultdir, subdir, 'status')
500			open(status_file, "a").write(msg + "\n")
501
502
503def runjob(control, cont = False, tag = "default", harness_type = ''):
504	"""The main interface to this module
505
506	control
507		The control file to use for this job.
508	cont
509		Whether this is the continuation of a previously started job
510	"""
511	control = os.path.abspath(control)
512	state = control + '.state'
513
514	# instantiate the job object ready for the control file.
515	myjob = None
516	try:
517		# Check that the control file is valid
518		if not os.path.exists(control):
519			raise JobError(control + ": control file not found")
520
521		# When continuing, the job is complete when there is no
522		# state file, ensure we don't try and continue.
523		if cont and not os.path.exists(state):
524			sys.exit(1)
525		if cont == False and os.path.exists(state):
526			os.unlink(state)
527
528		myjob = job(control, tag, cont, harness_type)
529
530		# Load in the users control file, may do any one of:
531		#  1) execute in toto
532		#  2) define steps, and select the first via next_step()
533		myjob.step_engine()
534
535	except JobContinue:
536		sys.exit(5)
537
538	except JobError, instance:
539		print "JOB ERROR: " + instance.args[0]
540		if myjob != None:
541			myjob.record('ABORT', None, instance.args[0])
542			myjob.complete(1)
543
544	except:
545		if myjob:
546			myjob.harness.run_abort()
547		# Ensure we cannot continue this job, it is in rictus.
548		if os.path.exists(state):
549			os.unlink(state)
550		raise
551
552	# If we get here, then we assume the job is complete and good.
553	myjob.complete(0)
554
555