job.py revision 30270306182f4feb599278ce8a7412fa8526eb68
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from error import *
14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
15import harness, config
16import sysinfo
17
18class job:
19	"""The actual job against which we do everything.
20
21	Properties:
22		autodir
23			The top level autotest directory (/usr/local/autotest).
24			Comes from os.environ['AUTODIR'].
25		bindir
26			<autodir>/bin/
27		testdir
28			<autodir>/tests/
29		profdir
30			<autodir>/profilers/
31		tmpdir
32			<autodir>/tmp/
33		resultdir
34			<autodir>/results/<jobtag>
35		stdout
36			fd_stack object for stdout
37		stderr
38			fd_stack object for stderr
39		profilers
40			the profilers object for this job
41		harness
42			the server harness object for this job
43		config
44			the job configuration for this job
45	"""
46
47	def __init__(self, control, jobtag, cont, harness_type=None):
48		"""
49			control
50				The control file (pathname of)
51			jobtag
52				The job tag string (eg "default")
53			cont
54				If this is the continuation of this job
55			harness_type
56				An alternative server harness
57		"""
58		self.autodir = os.environ['AUTODIR']
59		self.bindir = os.path.join(self.autodir, 'bin')
60		self.testdir = os.path.join(self.autodir, 'tests')
61		self.profdir = os.path.join(self.autodir, 'profilers')
62		self.tmpdir = os.path.join(self.autodir, 'tmp')
63		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
64
65		if not cont:
66			if os.path.exists(self.tmpdir):
67				system('umount -f %s > /dev/null 2> /dev/null'%\
68					 	self.tmpdir, ignorestatus=True)
69				system('rm -rf ' + self.tmpdir)
70			os.mkdir(self.tmpdir)
71
72			results = os.path.join(self.autodir, 'results')
73			if not os.path.exists(results):
74				os.mkdir(results)
75
76			download = os.path.join(self.testdir, 'download')
77			if os.path.exists(download):
78				system('rm -rf ' + download)
79			os.mkdir(download)
80
81			if os.path.exists(self.resultdir):
82				system('rm -rf ' + self.resultdir)
83			os.mkdir(self.resultdir)
84
85			os.mkdir(os.path.join(self.resultdir, 'debug'))
86			os.mkdir(os.path.join(self.resultdir, 'analysis'))
87			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
88
89			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
90
91		self.control = control
92		self.jobtag = jobtag
93
94		self.stdout = fd_stack.fd_stack(1, sys.stdout)
95		self.stderr = fd_stack.fd_stack(2, sys.stderr)
96		self.record_prefix = ''
97
98		self.config = config.config(self)
99
100		self.harness = harness.select(harness_type, self)
101
102		self.profilers = profilers.profilers(self)
103
104		try:
105			tool = self.config_get('boottool.executable')
106			self.bootloader = boottool.boottool(tool)
107		except:
108			pass
109
110		# log "before each step" sysinfo
111		pwd = os.getcwd()
112		try:
113			os.chdir(os.path.join(self.resultdir, 'sysinfo'))
114			sysinfo.before_each_step()
115		finally:
116			os.chdir(pwd)
117
118		if not cont:
119			self.harness.run_start()
120
121
122	def relative_path(self, path):
123		"""\
124		Return a patch relative to the job results directory
125		"""
126		head = len(self.resultdir) + 1     # remove the / inbetween
127		return path[head:]
128
129
130	def control_get(self):
131		return self.control
132
133
134	def harness_select(self, which):
135		self.harness = harness.select(which, self)
136
137
138	def config_set(self, name, value):
139		self.config.set(name, value)
140
141
142	def config_get(self, name):
143		return self.config.get(name)
144
145	def setup_dirs(self, results_dir, tmp_dir):
146		if not tmp_dir:
147			tmp_dir = os.path.join(self.tmpdir, 'build')
148		if not os.path.exists(tmp_dir):
149			os.mkdir(tmp_dir)
150		if not os.path.isdir(tmp_dir):
151			raise "Temp dir (%s) is not a dir - args backwards?" \
152								% self.tmpdir
153
154		# We label the first build "build" and then subsequent ones
155		# as "build.2", "build.3", etc. Whilst this is a little bit
156		# inconsistent, 99.9% of jobs will only have one build
157		# (that's not done as kernbench, sparse, or buildtest),
158		# so it works out much cleaner. One of life's comprimises.
159		if not results_dir:
160			results_dir = os.path.join(self.resultdir, 'build')
161			i = 2
162			while os.path.exists(results_dir):
163				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
164				i += 1
165		if not os.path.exists(results_dir):
166			os.mkdir(results_dir)
167
168		return (results_dir, tmp_dir)
169
170
171	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
172				kjob = None ):
173		"""Summon a xen object"""
174		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
175		build_dir = 'xen'
176		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
177
178
179	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
180		"""Summon a kernel object"""
181		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
182		if base_tree.endswith('.rpm'):
183			return kernel.rpm_kernel(self, base_tree, results_dir)
184		build_dir = 'linux'
185		return kernel.kernel(self, base_tree, results_dir, tmp_dir, build_dir, leave)
186
187
188	def barrier(self, *args):
189		"""Create a barrier object"""
190		return barrier.barrier(*args)
191
192
193	def setup_dep(self, deps):
194		"""Set up the dependencies for this test.
195
196		deps is a list of libraries required for this test.
197		"""
198		for dep in deps:
199			try:
200				os.chdir(os.path.join(self.autodir, 'deps', dep))
201				system('./' + dep + '.py')
202			except:
203				error = "setting up dependency " + dep + "\n"
204				raise UnhandledError(error)
205
206
207	def __runtest(self, url, tag, args, dargs):
208		try:
209			l = lambda : test.runtest(self, url, tag, args, dargs)
210			pid = fork_start(self.resultdir, l)
211			fork_waitfor(self.resultdir, pid)
212		except AutotestError:
213			raise
214		except:
215			raise UnhandledError('running test ' + \
216				self.__class__.__name__ + "\n")
217
218
219	def run_test(self, url, *args, **dargs):
220		"""Summon a test object and run it.
221
222		tag
223			tag to add to testname
224		url
225			url of the test to run
226		"""
227
228		if not url:
229			raise "Test name is invalid. Switched arguments?"
230		(group, testname) = test.testname(url)
231		tag = None
232		subdir = testname
233		if dargs.has_key('tag'):
234			tag = dargs['tag']
235			del dargs['tag']
236			if tag:
237				subdir += '.' + tag
238		try:
239			try:
240				self.__runtest(url, tag, args, dargs)
241			except Exception, detail:
242				self.record('FAIL', subdir, testname, \
243							detail.__str__())
244
245				raise
246			else:
247				self.record('GOOD', subdir, testname, \
248						'completed successfully')
249		except TestError:
250			return 0
251		except:
252			raise
253		else:
254			return 1
255
256
257	def run_group(self, function, *args, **dargs):
258		"""\
259		function:
260			subroutine to run
261		*args:
262			arguments for the function
263		"""
264
265		result = None
266		name = function.__name__
267
268		# Allow the tag for the group to be specified.
269		if dargs.has_key('tag'):
270			tag = dargs['tag']
271			del dargs['tag']
272			if tag:
273				name = tag
274
275		# if tag:
276		#	name += '.' + tag
277		old_record_prefix = self.record_prefix
278		try:
279			try:
280				self.record('START', None, name)
281				self.record_prefix += '\t'
282				result = function(*args, **dargs)
283				self.record_prefix = old_record_prefix
284				self.record('END GOOD', None, name)
285			except:
286				self.record_prefix = old_record_prefix
287				self.record('END FAIL', None, name, format_error())
288		# We don't want to raise up an error higher if it's just
289		# a TestError - we want to carry on to other tests. Hence
290		# this outer try/except block.
291		except TestError:
292			pass
293		except:
294			raise TestError(name + ' failed\n' + format_error())
295
296		return result
297
298
299	# Check the passed kernel identifier against the command line
300	# and the running kernel, abort the job on missmatch.
301	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
302		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
303			% (expected_when, expected_id, expected_cl, type)
304
305		running_id = running_os_ident()
306
307		cmdline = read_one_line("/proc/cmdline")
308
309		find_sum = re.compile(r'.*IDENT=(\d+)')
310		m = find_sum.match(cmdline)
311		cmdline_when = -1
312		if m:
313			cmdline_when = int(m.groups()[0])
314
315		cl_re = re.compile(r'\d{7,}')
316		cl_match = cl_re.search(system_output('uname -v').split()[1])
317		if cl_match:
318			current_cl = cl_match.group()
319		else:
320			current_cl = None
321
322		# We have all the facts, see if they indicate we
323		# booted the requested kernel or not.
324		bad = False
325		if (type == 'src' and expected_id != running_id or
326		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
327			print "check_kernel_ident: kernel identifier mismatch"
328			bad = True
329		if expected_when != cmdline_when:
330			print "check_kernel_ident: kernel command line mismatch"
331			bad = True
332		if expected_cl and current_cl and str(expected_cl) != current_cl:
333			print 'check_kernel_ident: kernel changelist mismatch'
334			bad = True
335
336		if bad:
337			print "   Expected Ident: " + expected_id
338			print "    Running Ident: " + running_id
339			print "    Expected Mark: %d" % (expected_when)
340			print "Command Line Mark: %d" % (cmdline_when)
341			print "   Expected P4 CL: %s" % expected_cl
342			print "            P4 CL: %s" % current_cl
343			print "     Command Line: " + cmdline
344
345			raise JobError("boot failure", "reboot.verify")
346
347		self.record('GOOD', subdir, 'reboot.verify')
348
349
350	def filesystem(self, device, mountpoint = None, loop_size = 0):
351		if not mountpoint:
352			mountpoint = self.tmpdir
353		return filesystem.filesystem(self, device, mountpoint,loop_size)
354
355
356	def reboot(self, tag='autotest'):
357		self.record('GOOD', None, 'reboot.start')
358		self.harness.run_reboot()
359		default = self.config_get('boot.set_default')
360		if default:
361			self.bootloader.set_default(tag)
362		else:
363			self.bootloader.boot_once(tag)
364		system("(sleep 5; reboot) &")
365		self.quit()
366
367
368	def noop(self, text):
369		print "job: noop: " + text
370
371
372	# Job control primatives.
373
374	def __parallel_execute(self, func, *args):
375		func(*args)
376
377
378	def parallel(self, *tasklist):
379		"""Run tasks in parallel"""
380
381		pids = []
382		for task in tasklist:
383			pids.append(fork_start(self.resultdir,
384					lambda: self.__parallel_execute(*task)))
385		for pid in pids:
386			fork_waitfor(self.resultdir, pid)
387
388
389	def quit(self):
390		# XXX: should have a better name.
391		self.harness.run_pause()
392		raise JobContinue("more to come")
393
394
395	def complete(self, status):
396		"""Clean up and exit"""
397		# We are about to exit 'complete' so clean up the control file.
398		try:
399			os.unlink(self.control + '.state')
400		except:
401			pass
402		self.harness.run_complete()
403		sys.exit(status)
404
405
406	steps = []
407	def next_step(self, step):
408		"""Define the next step"""
409		if not isinstance(step[0], basestring):
410			step[0] = step[0].__name__
411		self.steps.append(step)
412		pickle.dump(self.steps, open(self.control + '.state', 'w'))
413
414
415	def next_step_prepend(self, step):
416		"""Insert a new step, executing first"""
417		if not isinstance(step[0], basestring):
418			step[0] = step[0].__name__
419		self.steps.insert(0, step)
420		pickle.dump(self.steps, open(self.control + '.state', 'w'))
421
422
423	def step_engine(self):
424		"""the stepping engine -- if the control file defines
425		step_init we will be using this engine to drive multiple runs.
426		"""
427		"""Do the next step"""
428		lcl = dict({'job': self})
429
430		str = """
431from error import *
432from autotest_utils import *
433"""
434		exec(str, lcl, lcl)
435		execfile(self.control, lcl, lcl)
436
437		state = self.control + '.state'
438		# If there is a mid-job state file load that in and continue
439		# where it indicates.  Otherwise start stepping at the passed
440		# entry.
441		try:
442			self.steps = pickle.load(open(state, 'r'))
443		except:
444			if lcl.has_key('step_init'):
445				self.next_step([lcl['step_init']])
446
447		# Run the step list.
448		while len(self.steps) > 0:
449			step = self.steps.pop(0)
450			pickle.dump(self.steps, open(state, 'w'))
451
452			cmd = step.pop(0)
453			lcl['__args'] = step
454			exec(cmd + "(*__args)", lcl, lcl)
455
456
457	def record(self, status_code, subdir, operation, status = ''):
458		"""
459		Record job-level status
460
461		The intent is to make this file both machine parseable and
462		human readable. That involves a little more complexity, but
463		really isn't all that bad ;-)
464
465		Format is <status code>\t<subdir>\t<operation>\t<status>
466
467		status code: (GOOD|WARN|FAIL|ABORT)
468			or   START
469			or   END (GOOD|WARN|FAIL|ABORT)
470
471		subdir: MUST be a relevant subdirectory in the results,
472		or None, which will be represented as '----'
473
474		operation: description of what you ran (e.g. "dbench", or
475						"mkfs -t foobar /dev/sda9")
476
477		status: error message or "completed sucessfully"
478
479		------------------------------------------------------------
480
481		Initial tabs indicate indent levels for grouping, and is
482		governed by self.record_prefix
483
484		multiline messages have secondary lines prefaced by a double
485		space ('  ')
486		"""
487
488		if subdir:
489			if re.match(r'[\n\t]', subdir):
490				raise "Invalid character in subdir string"
491			substr = subdir
492		else:
493			substr = '----'
494
495		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
496								status_code):
497			raise "Invalid status code supplied: %s" % status_code
498		if not operation:
499			operation = '----'
500		if re.match(r'[\n\t]', operation):
501			raise "Invalid character in operation string"
502		operation = operation.rstrip()
503		status = status.rstrip()
504		status = re.sub(r"\t", "  ", status)
505		# Ensure any continuation lines are marked so we can
506		# detect them in the status file to ensure it is parsable.
507		status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
508
509		# Generate timestamps for inclusion in the logs
510		epoch_time = int(time.time())  # seconds since epoch, in UTC
511		local_time = time.localtime(epoch_time)
512		epoch_time_str = "timestamp=%d" % (epoch_time,)
513		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
514					       local_time)
515
516		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
517						 epoch_time_str, local_time_str,
518						 status))
519
520		self.harness.test_status_detail(status_code, substr,
521							operation, status)
522		self.harness.test_status(msg)
523		print msg
524		status_file = os.path.join(self.resultdir, 'status')
525		open(status_file, "a").write(self.record_prefix + msg + "\n")
526		if subdir:
527			status_file = os.path.join(self.resultdir, subdir, 'status')
528			open(status_file, "a").write(msg + "\n")
529
530
531def runjob(control, cont = False, tag = "default", harness_type = ''):
532	"""The main interface to this module
533
534	control
535		The control file to use for this job.
536	cont
537		Whether this is the continuation of a previously started job
538	"""
539	control = os.path.abspath(control)
540	state = control + '.state'
541
542	# instantiate the job object ready for the control file.
543	myjob = None
544	try:
545		# Check that the control file is valid
546		if not os.path.exists(control):
547			raise JobError(control + ": control file not found")
548
549		# When continuing, the job is complete when there is no
550		# state file, ensure we don't try and continue.
551		if cont and not os.path.exists(state):
552			sys.exit(1)
553		if cont == False and os.path.exists(state):
554			os.unlink(state)
555
556		myjob = job(control, tag, cont, harness_type)
557
558		# Load in the users control file, may do any one of:
559		#  1) execute in toto
560		#  2) define steps, and select the first via next_step()
561		myjob.step_engine()
562
563	except JobContinue:
564		sys.exit(5)
565
566	except JobError, instance:
567		print "JOB ERROR: " + instance.args[0]
568		if myjob:
569			command = None
570			if len(instance.args) > 1:
571				command = instance.args[1]
572			myjob.record('ABORT', None, command, instance.args[0])
573			myjob.complete(1)
574
575
576	except:
577		print "JOB ERROR: " + format_error()
578		if myjob:
579			myjob.record('ABORT', None, None, format_error())
580			myjob.complete(1)
581
582	# If we get here, then we assume the job is complete and good.
583	myjob.record('GOOD', None, None, 'job completed sucessfully')
584	myjob.complete(0)
585
586
587