job.py revision 8d83cdc678ea7dcfc6780a029d39f841f548d859
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from common.error import *
14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
15import harness, config
16import sysinfo
17
18class job:
19	"""The actual job against which we do everything.
20
21	Properties:
22		autodir
23			The top level autotest directory (/usr/local/autotest).
24			Comes from os.environ['AUTODIR'].
25		bindir
26			<autodir>/bin/
27		testdir
28			<autodir>/tests/
29		profdir
30			<autodir>/profilers/
31		tmpdir
32			<autodir>/tmp/
33		resultdir
34			<autodir>/results/<jobtag>
35		stdout
36			fd_stack object for stdout
37		stderr
38			fd_stack object for stderr
39		profilers
40			the profilers object for this job
41		harness
42			the server harness object for this job
43		config
44			the job configuration for this job
45	"""
46
47	def __init__(self, control, jobtag, cont, harness_type=None):
48		"""
49			control
50				The control file (pathname of)
51			jobtag
52				The job tag string (eg "default")
53			cont
54				If this is the continuation of this job
55			harness_type
56				An alternative server harness
57		"""
58		self.autodir = os.environ['AUTODIR']
59		self.bindir = os.path.join(self.autodir, 'bin')
60		self.testdir = os.path.join(self.autodir, 'tests')
61		self.profdir = os.path.join(self.autodir, 'profilers')
62		self.tmpdir = os.path.join(self.autodir, 'tmp')
63		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
64		self.control = os.path.abspath(control)
65
66		if not cont:
67			if os.path.exists(self.tmpdir):
68				system('umount -f %s > /dev/null 2> /dev/null'%\
69					 	self.tmpdir, ignorestatus=True)
70				system('rm -rf ' + self.tmpdir)
71			os.mkdir(self.tmpdir)
72
73			results = os.path.join(self.autodir, 'results')
74			if not os.path.exists(results):
75				os.mkdir(results)
76
77			download = os.path.join(self.testdir, 'download')
78			if os.path.exists(download):
79				system('rm -rf ' + download)
80			os.mkdir(download)
81
82			if os.path.exists(self.resultdir):
83				system('rm -rf ' + self.resultdir)
84			os.mkdir(self.resultdir)
85
86			os.mkdir(os.path.join(self.resultdir, 'debug'))
87			os.mkdir(os.path.join(self.resultdir, 'analysis'))
88			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
89
90			shutil.copyfile(self.control,
91					os.path.join(self.resultdir, 'control'))
92
93		self.control = control
94		self.jobtag = jobtag
95
96		self.stdout = fd_stack.fd_stack(1, sys.stdout)
97		self.stderr = fd_stack.fd_stack(2, sys.stderr)
98		self.group_level = 0
99
100		self.config = config.config(self)
101
102		self.harness = harness.select(harness_type, self)
103
104		self.profilers = profilers.profilers(self)
105
106		try:
107			tool = self.config_get('boottool.executable')
108			self.bootloader = boottool.boottool(tool)
109		except:
110			pass
111
112		# log "before each step" sysinfo
113		pwd = os.getcwd()
114		try:
115			os.chdir(os.path.join(self.resultdir, 'sysinfo'))
116			sysinfo.before_each_step()
117		finally:
118			os.chdir(pwd)
119
120		if not cont:
121			self.record('START', None, None)
122		self.group_level = 1
123
124		self.harness.run_start()
125
126
127	def relative_path(self, path):
128		"""\
129		Return a patch relative to the job results directory
130		"""
131		head = len(self.resultdir) + 1     # remove the / inbetween
132		return path[head:]
133
134
135	def control_get(self):
136		return self.control
137
138
139	def control_set(self, control):
140		self.control = os.path.abspath(control)
141
142
143	def harness_select(self, which):
144		self.harness = harness.select(which, self)
145
146
147	def config_set(self, name, value):
148		self.config.set(name, value)
149
150
151	def config_get(self, name):
152		return self.config.get(name)
153
154	def setup_dirs(self, results_dir, tmp_dir):
155		if not tmp_dir:
156			tmp_dir = os.path.join(self.tmpdir, 'build')
157		if not os.path.exists(tmp_dir):
158			os.mkdir(tmp_dir)
159		if not os.path.isdir(tmp_dir):
160			raise "Temp dir (%s) is not a dir - args backwards?" \
161								% self.tmpdir
162
163		# We label the first build "build" and then subsequent ones
164		# as "build.2", "build.3", etc. Whilst this is a little bit
165		# inconsistent, 99.9% of jobs will only have one build
166		# (that's not done as kernbench, sparse, or buildtest),
167		# so it works out much cleaner. One of life's comprimises.
168		if not results_dir:
169			results_dir = os.path.join(self.resultdir, 'build')
170			i = 2
171			while os.path.exists(results_dir):
172				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
173				i += 1
174		if not os.path.exists(results_dir):
175			os.mkdir(results_dir)
176
177		return (results_dir, tmp_dir)
178
179
180	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
181				kjob = None ):
182		"""Summon a xen object"""
183		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
184		build_dir = 'xen'
185		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
186
187
188	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
189		"""Summon a kernel object"""
190		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
191		build_dir = 'linux'
192		return kernel.auto_kernel(self, base_tree, results_dir,
193					  tmp_dir, build_dir, leave)
194
195
196	def barrier(self, *args):
197		"""Create a barrier object"""
198		return barrier.barrier(*args)
199
200
201	def setup_dep(self, deps):
202		"""Set up the dependencies for this test.
203
204		deps is a list of libraries required for this test.
205		"""
206		for dep in deps:
207			try:
208				os.chdir(os.path.join(self.autodir, 'deps', dep))
209				system('./' + dep + '.py')
210			except:
211				error = "setting up dependency " + dep + "\n"
212				raise UnhandledError(error)
213
214
215	def __runtest(self, url, tag, args, dargs):
216		try:
217			l = lambda : test.runtest(self, url, tag, args, dargs)
218			pid = fork_start(self.resultdir, l)
219			fork_waitfor(self.resultdir, pid)
220		except AutotestError:
221			raise
222		except:
223			raise UnhandledError('running test ' + \
224				self.__class__.__name__ + "\n")
225
226
227	def run_test(self, url, *args, **dargs):
228		"""Summon a test object and run it.
229
230		tag
231			tag to add to testname
232		url
233			url of the test to run
234		"""
235
236		if not url:
237			raise "Test name is invalid. Switched arguments?"
238		(group, testname) = test.testname(url)
239		tag = dargs.pop('tag', None)
240		subdir = testname
241		if tag:
242			subdir += '.' + tag
243
244		def group_func():
245			try:
246				self.__runtest(url, tag, args, dargs)
247			except Exception, detail:
248				self.record('FAIL', subdir, testname,
249					    str(detail))
250				raise
251			else:
252				self.record('GOOD', subdir, testname,
253					    'completed successfully')
254		result, exc_info = self.__rungroup(subdir, group_func)
255
256		if exc_info and isinstance(exc_info[1], TestError):
257			return False
258		elif exc_info:
259			raise exc_info[0], exc_info[1], exc_info[2]
260		else:
261			return True
262
263
264	def __rungroup(self, name, function, *args, **dargs):
265		"""\
266		name:
267		        name of the group
268		function:
269			subroutine to run
270		*args:
271			arguments for the function
272
273		Returns a 2-tuple (result, exc_info) where result
274		is the return value of function, and exc_info is
275		the sys.exc_info() of the exception thrown by the
276		function (which may be None).
277		"""
278
279		result, exc_info = None, None
280		try:
281			self.record('START', None, name)
282			self.group_level += 1
283			result = function(*args, **dargs)
284			self.group_level -= 1
285			self.record('END GOOD', None, name)
286		except Exception, e:
287			exc_info = sys.exc_info()
288			self.group_level -= 1
289			err_msg = str(e) + '\n' + format_error()
290			self.record('END FAIL', None, name, err_msg)
291
292		return result, exc_info
293
294
295	def run_group(self, function, *args, **dargs):
296		"""\
297		function:
298			subroutine to run
299		*args:
300			arguments for the function
301		"""
302
303		# Allow the tag for the group to be specified
304		name = function.__name__
305		tag = dargs.pop('tag', None)
306		if tag:
307			name = tag
308
309		result, exc_info = self.__rungroup(name, function,
310						   *args, **dargs)
311
312		# if there was a non-TestError exception, raise it
313		if exc_info and isinstance(exc_info[1], TestError):
314			err = ''.join(traceback.format_exception(*exc_info))
315			raise TestError(name + ' failed\n' + err)
316
317		# pass back the actual return value from the function
318		return result
319
320
321	# Check the passed kernel identifier against the command line
322	# and the running kernel, abort the job on missmatch.
323	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
324		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
325			% (expected_when, expected_id, expected_cl, type)
326
327		running_id = running_os_ident()
328
329		cmdline = read_one_line("/proc/cmdline")
330
331		find_sum = re.compile(r'.*IDENT=(\d+)')
332		m = find_sum.match(cmdline)
333		cmdline_when = -1
334		if m:
335			cmdline_when = int(m.groups()[0])
336
337		cl_re = re.compile(r'\d{7,}')
338		cl_match = cl_re.search(system_output('uname -v').split()[1])
339		if cl_match:
340			current_cl = cl_match.group()
341		else:
342			current_cl = None
343
344		# We have all the facts, see if they indicate we
345		# booted the requested kernel or not.
346		bad = False
347		if (type == 'src' and expected_id != running_id or
348		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
349			print "check_kernel_ident: kernel identifier mismatch"
350			bad = True
351		if expected_when != cmdline_when:
352			print "check_kernel_ident: kernel command line mismatch"
353			bad = True
354		if expected_cl and current_cl and str(expected_cl) != current_cl:
355			print 'check_kernel_ident: kernel changelist mismatch'
356			bad = True
357
358		if bad:
359			print "   Expected Ident: " + expected_id
360			print "    Running Ident: " + running_id
361			print "    Expected Mark: %d" % (expected_when)
362			print "Command Line Mark: %d" % (cmdline_when)
363			print "   Expected P4 CL: %s" % expected_cl
364			print "            P4 CL: %s" % current_cl
365			print "     Command Line: " + cmdline
366
367			raise JobError("boot failure", "reboot.verify")
368
369		self.record('GOOD', subdir, 'reboot.verify')
370
371
372	def filesystem(self, device, mountpoint = None, loop_size = 0):
373		if not mountpoint:
374			mountpoint = self.tmpdir
375		return filesystem.filesystem(self, device, mountpoint,loop_size)
376
377
378	def reboot(self, tag='autotest'):
379		self.record('GOOD', None, 'reboot.start')
380		self.harness.run_reboot()
381		default = self.config_get('boot.set_default')
382		if default:
383			self.bootloader.set_default(tag)
384		else:
385			self.bootloader.boot_once(tag)
386		system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
387		self.quit()
388
389
390	def noop(self, text):
391		print "job: noop: " + text
392
393
394	# Job control primatives.
395
396	def __parallel_execute(self, func, *args):
397		func(*args)
398
399
400	def parallel(self, *tasklist):
401		"""Run tasks in parallel"""
402
403		pids = []
404		for task in tasklist:
405			pids.append(fork_start(self.resultdir,
406					lambda: self.__parallel_execute(*task)))
407		for pid in pids:
408			fork_waitfor(self.resultdir, pid)
409
410
411	def quit(self):
412		# XXX: should have a better name.
413		self.harness.run_pause()
414		raise JobContinue("more to come")
415
416
417	def complete(self, status):
418		"""Clean up and exit"""
419		# We are about to exit 'complete' so clean up the control file.
420		try:
421			os.unlink(self.control + '.state')
422		except:
423			pass
424		self.harness.run_complete()
425		sys.exit(status)
426
427
428	steps = []
429	def next_step(self, step):
430		"""Define the next step"""
431		if not isinstance(step[0], basestring):
432			step[0] = step[0].__name__
433		self.steps.append(step)
434		pickle.dump(self.steps, open(self.control + '.state', 'w'))
435
436
437	def next_step_prepend(self, step):
438		"""Insert a new step, executing first"""
439		if not isinstance(step[0], basestring):
440			step[0] = step[0].__name__
441		self.steps.insert(0, step)
442		pickle.dump(self.steps, open(self.control + '.state', 'w'))
443
444
445	def step_engine(self):
446		"""the stepping engine -- if the control file defines
447		step_init we will be using this engine to drive multiple runs.
448		"""
449		"""Do the next step"""
450		lcl = dict({'job': self})
451
452		str = """
453from common.error import *
454from autotest_utils import *
455"""
456		exec(str, lcl, lcl)
457		execfile(self.control, lcl, lcl)
458
459		state = self.control + '.state'
460		# If there is a mid-job state file load that in and continue
461		# where it indicates.  Otherwise start stepping at the passed
462		# entry.
463		try:
464			self.steps = pickle.load(open(state, 'r'))
465		except:
466			if lcl.has_key('step_init'):
467				self.next_step([lcl['step_init']])
468
469		# Run the step list.
470		while len(self.steps) > 0:
471			step = self.steps.pop(0)
472			pickle.dump(self.steps, open(state, 'w'))
473
474			cmd = step.pop(0)
475			lcl['__args'] = step
476			exec(cmd + "(*__args)", lcl, lcl)
477
478
479	def record(self, status_code, subdir, operation, status = ''):
480		"""
481		Record job-level status
482
483		The intent is to make this file both machine parseable and
484		human readable. That involves a little more complexity, but
485		really isn't all that bad ;-)
486
487		Format is <status code>\t<subdir>\t<operation>\t<status>
488
489		status code: (GOOD|WARN|FAIL|ABORT)
490			or   START
491			or   END (GOOD|WARN|FAIL|ABORT)
492
493		subdir: MUST be a relevant subdirectory in the results,
494		or None, which will be represented as '----'
495
496		operation: description of what you ran (e.g. "dbench", or
497						"mkfs -t foobar /dev/sda9")
498
499		status: error message or "completed sucessfully"
500
501		------------------------------------------------------------
502
503		Initial tabs indicate indent levels for grouping, and is
504		governed by self.group_level
505
506		multiline messages have secondary lines prefaced by a double
507		space ('  ')
508		"""
509
510		if subdir:
511			if re.match(r'[\n\t]', subdir):
512				raise "Invalid character in subdir string"
513			substr = subdir
514		else:
515			substr = '----'
516
517		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
518								status_code):
519			raise "Invalid status code supplied: %s" % status_code
520		if not operation:
521			operation = '----'
522		if re.match(r'[\n\t]', operation):
523			raise "Invalid character in operation string"
524		operation = operation.rstrip()
525		status = status.rstrip()
526		status = re.sub(r"\t", "  ", status)
527		# Ensure any continuation lines are marked so we can
528		# detect them in the status file to ensure it is parsable.
529		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
530
531		# Generate timestamps for inclusion in the logs
532		epoch_time = int(time.time())  # seconds since epoch, in UTC
533		local_time = time.localtime(epoch_time)
534		epoch_time_str = "timestamp=%d" % (epoch_time,)
535		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
536					       local_time)
537
538		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
539						 epoch_time_str, local_time_str,
540						 status))
541		msg = '\t' * self.group_level + msg
542
543		self.harness.test_status_detail(status_code, substr,
544							operation, status)
545		self.harness.test_status(msg)
546		print msg
547		status_file = os.path.join(self.resultdir, 'status')
548		open(status_file, "a").write(msg + "\n")
549		if subdir:
550			status_file = os.path.join(self.resultdir, subdir, 'status')
551			open(status_file, "a").write(msg + "\n")
552
553
554def runjob(control, cont = False, tag = "default", harness_type = ''):
555	"""The main interface to this module
556
557	control
558		The control file to use for this job.
559	cont
560		Whether this is the continuation of a previously started job
561	"""
562	control = os.path.abspath(control)
563	state = control + '.state'
564
565	# instantiate the job object ready for the control file.
566	myjob = None
567	try:
568		# Check that the control file is valid
569		if not os.path.exists(control):
570			raise JobError(control + ": control file not found")
571
572		# When continuing, the job is complete when there is no
573		# state file, ensure we don't try and continue.
574		if cont and not os.path.exists(state):
575			raise JobComplete("all done")
576		if cont == False and os.path.exists(state):
577			os.unlink(state)
578
579		myjob = job(control, tag, cont, harness_type)
580
581		# Load in the users control file, may do any one of:
582		#  1) execute in toto
583		#  2) define steps, and select the first via next_step()
584		myjob.step_engine()
585
586	except JobContinue:
587		sys.exit(5)
588
589	except JobComplete:
590		sys.exit(1)
591
592	except JobError, instance:
593		print "JOB ERROR: " + instance.args[0]
594		if myjob:
595			command = None
596			if len(instance.args) > 1:
597				command = instance.args[1]
598			myjob.group_level = 0
599			myjob.record('ABORT', None, command, instance.args[0])
600			myjob.record('END ABORT', None, None)
601			myjob.complete(1)
602		else:
603			sys.exit(1)
604
605	except Exception, e:
606		msg = str(e) + '\n' + format_error()
607		print "JOB ERROR: " + msg
608		if myjob:
609			myjob.group_level = 0
610			myjob.record('ABORT', None, None, msg)
611			myjob.record('END ABORT', None, None)
612			myjob.complete(1)
613		else:
614			sys.exit(1)
615
616	# If we get here, then we assume the job is complete and good.
617	myjob.group_level = 0
618	myjob.record('END GOOD', None, None)
619	myjob.complete(0)
620