job.py revision f91efaf53bf840d9545c3c0ceaa928d8fbb6df3b
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from error import *
14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
15import harness, config
16import sysinfo
17
18class job:
19	"""The actual job against which we do everything.
20
21	Properties:
22		autodir
23			The top level autotest directory (/usr/local/autotest).
24			Comes from os.environ['AUTODIR'].
25		bindir
26			<autodir>/bin/
27		testdir
28			<autodir>/tests/
29		profdir
30			<autodir>/profilers/
31		tmpdir
32			<autodir>/tmp/
33		resultdir
34			<autodir>/results/<jobtag>
35		stdout
36			fd_stack object for stdout
37		stderr
38			fd_stack object for stderr
39		profilers
40			the profilers object for this job
41		harness
42			the server harness object for this job
43		config
44			the job configuration for this job
45	"""
46
47	def __init__(self, control, jobtag, cont, harness_type=None):
48		"""
49			control
50				The control file (pathname of)
51			jobtag
52				The job tag string (eg "default")
53			cont
54				If this is the continuation of this job
55			harness_type
56				An alternative server harness
57		"""
58		self.autodir = os.environ['AUTODIR']
59		self.bindir = os.path.join(self.autodir, 'bin')
60		self.testdir = os.path.join(self.autodir, 'tests')
61		self.profdir = os.path.join(self.autodir, 'profilers')
62		self.tmpdir = os.path.join(self.autodir, 'tmp')
63		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
64
65		if not cont:
66			if os.path.exists(self.tmpdir):
67				system('umount -f %s > /dev/null 2> /dev/null'%\
68					 	self.tmpdir, ignorestatus=True)
69				system('rm -rf ' + self.tmpdir)
70			os.mkdir(self.tmpdir)
71
72			results = os.path.join(self.autodir, 'results')
73			if not os.path.exists(results):
74				os.mkdir(results)
75
76			download = os.path.join(self.testdir, 'download')
77			if os.path.exists(download):
78				system('rm -rf ' + download)
79			os.mkdir(download)
80
81			if os.path.exists(self.resultdir):
82				system('rm -rf ' + self.resultdir)
83			os.mkdir(self.resultdir)
84
85			os.mkdir(os.path.join(self.resultdir, 'debug'))
86			os.mkdir(os.path.join(self.resultdir, 'analysis'))
87			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
88
89			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
90
91		self.control = control
92		self.jobtag = jobtag
93
94		self.stdout = fd_stack.fd_stack(1, sys.stdout)
95		self.stderr = fd_stack.fd_stack(2, sys.stderr)
96		self.group_level = 0
97
98		self.config = config.config(self)
99
100		self.harness = harness.select(harness_type, self)
101
102		self.profilers = profilers.profilers(self)
103
104		try:
105			tool = self.config_get('boottool.executable')
106			self.bootloader = boottool.boottool(tool)
107		except:
108			pass
109
110		# log "before each step" sysinfo
111		pwd = os.getcwd()
112		try:
113			os.chdir(os.path.join(self.resultdir, 'sysinfo'))
114			sysinfo.before_each_step()
115		finally:
116			os.chdir(pwd)
117
118		if not cont:
119			self.record('START', None, None)
120		self.group_level = 1
121
122		self.harness.run_start()
123
124
125	def relative_path(self, path):
126		"""\
127		Return a patch relative to the job results directory
128		"""
129		head = len(self.resultdir) + 1     # remove the / inbetween
130		return path[head:]
131
132
133	def control_get(self):
134		return self.control
135
136
137	def harness_select(self, which):
138		self.harness = harness.select(which, self)
139
140
141	def config_set(self, name, value):
142		self.config.set(name, value)
143
144
145	def config_get(self, name):
146		return self.config.get(name)
147
148	def setup_dirs(self, results_dir, tmp_dir):
149		if not tmp_dir:
150			tmp_dir = os.path.join(self.tmpdir, 'build')
151		if not os.path.exists(tmp_dir):
152			os.mkdir(tmp_dir)
153		if not os.path.isdir(tmp_dir):
154			raise "Temp dir (%s) is not a dir - args backwards?" \
155								% self.tmpdir
156
157		# We label the first build "build" and then subsequent ones
158		# as "build.2", "build.3", etc. Whilst this is a little bit
159		# inconsistent, 99.9% of jobs will only have one build
160		# (that's not done as kernbench, sparse, or buildtest),
161		# so it works out much cleaner. One of life's comprimises.
162		if not results_dir:
163			results_dir = os.path.join(self.resultdir, 'build')
164			i = 2
165			while os.path.exists(results_dir):
166				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
167				i += 1
168		if not os.path.exists(results_dir):
169			os.mkdir(results_dir)
170
171		return (results_dir, tmp_dir)
172
173
174	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
175				kjob = None ):
176		"""Summon a xen object"""
177		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
178		build_dir = 'xen'
179		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
180
181
182	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
183		"""Summon a kernel object"""
184		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
185		build_dir = 'linux'
186		return kernel.auto_kernel(self, base_tree, results_dir,
187					  tmp_dir, build_dir, leave)
188
189
190	def barrier(self, *args):
191		"""Create a barrier object"""
192		return barrier.barrier(*args)
193
194
195	def setup_dep(self, deps):
196		"""Set up the dependencies for this test.
197
198		deps is a list of libraries required for this test.
199		"""
200		for dep in deps:
201			try:
202				os.chdir(os.path.join(self.autodir, 'deps', dep))
203				system('./' + dep + '.py')
204			except:
205				error = "setting up dependency " + dep + "\n"
206				raise UnhandledError(error)
207
208
209	def __runtest(self, url, tag, args, dargs):
210		try:
211			l = lambda : test.runtest(self, url, tag, args, dargs)
212			pid = fork_start(self.resultdir, l)
213			fork_waitfor(self.resultdir, pid)
214		except AutotestError:
215			raise
216		except:
217			raise UnhandledError('running test ' + \
218				self.__class__.__name__ + "\n")
219
220
221	def run_test(self, url, *args, **dargs):
222		"""Summon a test object and run it.
223
224		tag
225			tag to add to testname
226		url
227			url of the test to run
228		"""
229
230		if not url:
231			raise "Test name is invalid. Switched arguments?"
232		(group, testname) = test.testname(url)
233		tag = dargs.pop('tag', None)
234		subdir = testname
235		if tag:
236			subdir += '.' + tag
237
238		def group_func():
239			try:
240				self.__runtest(url, tag, args, dargs)
241			except Exception, detail:
242				self.record('FAIL', subdir, testname,
243					    str(detail))
244				raise
245			else:
246				self.record('GOOD', subdir, testname,
247					    'completed successfully')
248		result, exc_info = self.__rungroup(subdir, group_func)
249
250		if exc_info and isinstance(exc_info[1], TestError):
251			return False
252		elif exc_info:
253			raise exc_info[0], exc_info[1], exc_info[2]
254		else:
255			return True
256
257
258	def __rungroup(self, name, function, *args, **dargs):
259		"""\
260		name:
261		        name of the group
262		function:
263			subroutine to run
264		*args:
265			arguments for the function
266
267		Returns a 2-tuple (result, exc_info) where result
268		is the return value of function, and exc_info is
269		the sys.exc_info() of the exception thrown by the
270		function (which may be None).
271		"""
272
273		result, exc_info = None, None
274		try:
275			self.record('START', None, name)
276			self.group_level += 1
277			result = function(*args, **dargs)
278			self.group_level -= 1
279			self.record('END GOOD', None, name)
280		except Exception, e:
281			exc_info = sys.exc_info()
282			self.group_level -= 1
283			err_msg = str(e) + '\n' + format_error()
284			self.record('END FAIL', None, name, err_msg)
285
286		return result, exc_info
287
288
289	def run_group(self, function, *args, **dargs):
290		"""\
291		function:
292			subroutine to run
293		*args:
294			arguments for the function
295		"""
296
297		# Allow the tag for the group to be specified
298		name = function.__name__
299		tag = dargs.pop('tag', None)
300		if tag:
301			name = tag
302
303		result, exc_info = self.__rungroup(name, function,
304						   *args, **dargs)
305
306		# if there was a non-TestError exception, raise it
307		if exc_info and isinstance(exc_info[1], TestError):
308			err = ''.join(traceback.format_exception(*exc_info))
309			raise TestError(name + ' failed\n' + err)
310
311		# pass back the actual return value from the function
312		return result
313
314
315	# Check the passed kernel identifier against the command line
316	# and the running kernel, abort the job on missmatch.
317	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
318		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
319			% (expected_when, expected_id, expected_cl, type)
320
321		running_id = running_os_ident()
322
323		cmdline = read_one_line("/proc/cmdline")
324
325		find_sum = re.compile(r'.*IDENT=(\d+)')
326		m = find_sum.match(cmdline)
327		cmdline_when = -1
328		if m:
329			cmdline_when = int(m.groups()[0])
330
331		cl_re = re.compile(r'\d{7,}')
332		cl_match = cl_re.search(system_output('uname -v').split()[1])
333		if cl_match:
334			current_cl = cl_match.group()
335		else:
336			current_cl = None
337
338		# We have all the facts, see if they indicate we
339		# booted the requested kernel or not.
340		bad = False
341		if (type == 'src' and expected_id != running_id or
342		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
343			print "check_kernel_ident: kernel identifier mismatch"
344			bad = True
345		if expected_when != cmdline_when:
346			print "check_kernel_ident: kernel command line mismatch"
347			bad = True
348		if expected_cl and current_cl and str(expected_cl) != current_cl:
349			print 'check_kernel_ident: kernel changelist mismatch'
350			bad = True
351
352		if bad:
353			print "   Expected Ident: " + expected_id
354			print "    Running Ident: " + running_id
355			print "    Expected Mark: %d" % (expected_when)
356			print "Command Line Mark: %d" % (cmdline_when)
357			print "   Expected P4 CL: %s" % expected_cl
358			print "            P4 CL: %s" % current_cl
359			print "     Command Line: " + cmdline
360
361			raise JobError("boot failure", "reboot.verify")
362
363		self.record('GOOD', subdir, 'reboot.verify')
364
365
366	def filesystem(self, device, mountpoint = None, loop_size = 0):
367		if not mountpoint:
368			mountpoint = self.tmpdir
369		return filesystem.filesystem(self, device, mountpoint,loop_size)
370
371
372	def reboot(self, tag='autotest'):
373		self.record('GOOD', None, 'reboot.start')
374		self.harness.run_reboot()
375		default = self.config_get('boot.set_default')
376		if default:
377			self.bootloader.set_default(tag)
378		else:
379			self.bootloader.boot_once(tag)
380		system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
381		self.quit()
382
383
384	def noop(self, text):
385		print "job: noop: " + text
386
387
388	# Job control primatives.
389
390	def __parallel_execute(self, func, *args):
391		func(*args)
392
393
394	def parallel(self, *tasklist):
395		"""Run tasks in parallel"""
396
397		pids = []
398		for task in tasklist:
399			pids.append(fork_start(self.resultdir,
400					lambda: self.__parallel_execute(*task)))
401		for pid in pids:
402			fork_waitfor(self.resultdir, pid)
403
404
405	def quit(self):
406		# XXX: should have a better name.
407		self.harness.run_pause()
408		raise JobContinue("more to come")
409
410
411	def complete(self, status):
412		"""Clean up and exit"""
413		# We are about to exit 'complete' so clean up the control file.
414		try:
415			os.unlink(self.control + '.state')
416		except:
417			pass
418		self.harness.run_complete()
419		sys.exit(status)
420
421
422	steps = []
423	def next_step(self, step):
424		"""Define the next step"""
425		if not isinstance(step[0], basestring):
426			step[0] = step[0].__name__
427		self.steps.append(step)
428		pickle.dump(self.steps, open(self.control + '.state', 'w'))
429
430
431	def next_step_prepend(self, step):
432		"""Insert a new step, executing first"""
433		if not isinstance(step[0], basestring):
434			step[0] = step[0].__name__
435		self.steps.insert(0, step)
436		pickle.dump(self.steps, open(self.control + '.state', 'w'))
437
438
439	def step_engine(self):
440		"""the stepping engine -- if the control file defines
441		step_init we will be using this engine to drive multiple runs.
442		"""
443		"""Do the next step"""
444		lcl = dict({'job': self})
445
446		str = """
447from error import *
448from autotest_utils import *
449"""
450		exec(str, lcl, lcl)
451		execfile(self.control, lcl, lcl)
452
453		state = self.control + '.state'
454		# If there is a mid-job state file load that in and continue
455		# where it indicates.  Otherwise start stepping at the passed
456		# entry.
457		try:
458			self.steps = pickle.load(open(state, 'r'))
459		except:
460			if lcl.has_key('step_init'):
461				self.next_step([lcl['step_init']])
462
463		# Run the step list.
464		while len(self.steps) > 0:
465			step = self.steps.pop(0)
466			pickle.dump(self.steps, open(state, 'w'))
467
468			cmd = step.pop(0)
469			lcl['__args'] = step
470			exec(cmd + "(*__args)", lcl, lcl)
471
472
473	def record(self, status_code, subdir, operation, status = ''):
474		"""
475		Record job-level status
476
477		The intent is to make this file both machine parseable and
478		human readable. That involves a little more complexity, but
479		really isn't all that bad ;-)
480
481		Format is <status code>\t<subdir>\t<operation>\t<status>
482
483		status code: (GOOD|WARN|FAIL|ABORT)
484			or   START
485			or   END (GOOD|WARN|FAIL|ABORT)
486
487		subdir: MUST be a relevant subdirectory in the results,
488		or None, which will be represented as '----'
489
490		operation: description of what you ran (e.g. "dbench", or
491						"mkfs -t foobar /dev/sda9")
492
493		status: error message or "completed sucessfully"
494
495		------------------------------------------------------------
496
497		Initial tabs indicate indent levels for grouping, and is
498		governed by self.group_level
499
500		multiline messages have secondary lines prefaced by a double
501		space ('  ')
502		"""
503
504		if subdir:
505			if re.match(r'[\n\t]', subdir):
506				raise "Invalid character in subdir string"
507			substr = subdir
508		else:
509			substr = '----'
510
511		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
512								status_code):
513			raise "Invalid status code supplied: %s" % status_code
514		if not operation:
515			operation = '----'
516		if re.match(r'[\n\t]', operation):
517			raise "Invalid character in operation string"
518		operation = operation.rstrip()
519		status = status.rstrip()
520		status = re.sub(r"\t", "  ", status)
521		# Ensure any continuation lines are marked so we can
522		# detect them in the status file to ensure it is parsable.
523		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
524
525		# Generate timestamps for inclusion in the logs
526		epoch_time = int(time.time())  # seconds since epoch, in UTC
527		local_time = time.localtime(epoch_time)
528		epoch_time_str = "timestamp=%d" % (epoch_time,)
529		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
530					       local_time)
531
532		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
533						 epoch_time_str, local_time_str,
534						 status))
535		msg = '\t' * self.group_level + msg
536
537		self.harness.test_status_detail(status_code, substr,
538							operation, status)
539		self.harness.test_status(msg)
540		print msg
541		status_file = os.path.join(self.resultdir, 'status')
542		open(status_file, "a").write(msg + "\n")
543		if subdir:
544			status_file = os.path.join(self.resultdir, subdir, 'status')
545			open(status_file, "a").write(msg + "\n")
546
547
548def runjob(control, cont = False, tag = "default", harness_type = ''):
549	"""The main interface to this module
550
551	control
552		The control file to use for this job.
553	cont
554		Whether this is the continuation of a previously started job
555	"""
556	control = os.path.abspath(control)
557	state = control + '.state'
558
559	# instantiate the job object ready for the control file.
560	myjob = None
561	try:
562		# Check that the control file is valid
563		if not os.path.exists(control):
564			raise JobError(control + ": control file not found")
565
566		# When continuing, the job is complete when there is no
567		# state file, ensure we don't try and continue.
568		if cont and not os.path.exists(state):
569			sys.exit(1)
570		if cont == False and os.path.exists(state):
571			os.unlink(state)
572
573		myjob = job(control, tag, cont, harness_type)
574
575		# Load in the users control file, may do any one of:
576		#  1) execute in toto
577		#  2) define steps, and select the first via next_step()
578		myjob.step_engine()
579
580	except JobContinue:
581		sys.exit(5)
582
583	except JobError, instance:
584		print "JOB ERROR: " + instance.args[0]
585		if myjob:
586			command = None
587			if len(instance.args) > 1:
588				command = instance.args[1]
589			myjob.group_level = 0
590			myjob.record('ABORT', None, command, instance.args[0])
591			myjob.record('END ABORT', None, None)
592			myjob.complete(1)
593
594	except Exception, e:
595		msg = str(e) + '\n' + format_error()
596		print "JOB ERROR: " + msg
597		if myjob:
598			myjob.group_level = 0
599			myjob.record('ABORT', None, None, msg)
600			myjob.record('END ABORT', None, None)
601			myjob.complete(1)
602
603	# If we get here, then we assume the job is complete and good.
604	myjob.group_level = 0
605	myjob.record('END GOOD', None, None)
606	myjob.complete(0)
607