job.py revision cfc6dd3f4fa1b68921c785e720574168486e49ff
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from error import *
14import kernel, xen, test, profilers, barrier, filesystem, fd_stack, boottool
15import harness, config
16import sysinfo
17
18class job:
19	"""The actual job against which we do everything.
20
21	Properties:
22		autodir
23			The top level autotest directory (/usr/local/autotest).
24			Comes from os.environ['AUTODIR'].
25		bindir
26			<autodir>/bin/
27		testdir
28			<autodir>/tests/
29		profdir
30			<autodir>/profilers/
31		tmpdir
32			<autodir>/tmp/
33		resultdir
34			<autodir>/results/<jobtag>
35		stdout
36			fd_stack object for stdout
37		stderr
38			fd_stack object for stderr
39		profilers
40			the profilers object for this job
41		harness
42			the server harness object for this job
43		config
44			the job configuration for this job
45	"""
46
47	def __init__(self, control, jobtag, cont, harness_type=None):
48		"""
49			control
50				The control file (pathname of)
51			jobtag
52				The job tag string (eg "default")
53			cont
54				If this is the continuation of this job
55			harness_type
56				An alternative server harness
57		"""
58		self.autodir = os.environ['AUTODIR']
59		self.bindir = os.path.join(self.autodir, 'bin')
60		self.testdir = os.path.join(self.autodir, 'tests')
61		self.profdir = os.path.join(self.autodir, 'profilers')
62		self.tmpdir = os.path.join(self.autodir, 'tmp')
63		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
64
65		if not cont:
66			if os.path.exists(self.tmpdir):
67				system('umount -f %s > /dev/null 2> /dev/null'%\
68					 	self.tmpdir, ignorestatus=True)
69				system('rm -rf ' + self.tmpdir)
70			os.mkdir(self.tmpdir)
71
72			results = os.path.join(self.autodir, 'results')
73			if not os.path.exists(results):
74				os.mkdir(results)
75
76			download = os.path.join(self.testdir, 'download')
77			if os.path.exists(download):
78				system('rm -rf ' + download)
79			os.mkdir(download)
80
81			if os.path.exists(self.resultdir):
82				system('rm -rf ' + self.resultdir)
83			os.mkdir(self.resultdir)
84
85			os.mkdir(os.path.join(self.resultdir, 'debug'))
86			os.mkdir(os.path.join(self.resultdir, 'analysis'))
87			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
88
89			shutil.copyfile(control, os.path.join(self.resultdir, 'control'))
90
91		self.control = control
92		self.jobtag = jobtag
93
94		self.stdout = fd_stack.fd_stack(1, sys.stdout)
95		self.stderr = fd_stack.fd_stack(2, sys.stderr)
96		self.group_level = 0
97
98		self.config = config.config(self)
99
100		self.harness = harness.select(harness_type, self)
101
102		self.profilers = profilers.profilers(self)
103
104		try:
105			tool = self.config_get('boottool.executable')
106			self.bootloader = boottool.boottool(tool)
107		except:
108			pass
109
110		# log "before each step" sysinfo
111		pwd = os.getcwd()
112		try:
113			os.chdir(os.path.join(self.resultdir, 'sysinfo'))
114			sysinfo.before_each_step()
115		finally:
116			os.chdir(pwd)
117
118		if not cont:
119			self.record('START', None, None)
120			self.harness.run_start()
121		self.group_level = 1
122
123
124	def relative_path(self, path):
125		"""\
126		Return a patch relative to the job results directory
127		"""
128		head = len(self.resultdir) + 1     # remove the / inbetween
129		return path[head:]
130
131
132	def control_get(self):
133		return self.control
134
135
136	def harness_select(self, which):
137		self.harness = harness.select(which, self)
138
139
140	def config_set(self, name, value):
141		self.config.set(name, value)
142
143
144	def config_get(self, name):
145		return self.config.get(name)
146
147	def setup_dirs(self, results_dir, tmp_dir):
148		if not tmp_dir:
149			tmp_dir = os.path.join(self.tmpdir, 'build')
150		if not os.path.exists(tmp_dir):
151			os.mkdir(tmp_dir)
152		if not os.path.isdir(tmp_dir):
153			raise "Temp dir (%s) is not a dir - args backwards?" \
154								% self.tmpdir
155
156		# We label the first build "build" and then subsequent ones
157		# as "build.2", "build.3", etc. Whilst this is a little bit
158		# inconsistent, 99.9% of jobs will only have one build
159		# (that's not done as kernbench, sparse, or buildtest),
160		# so it works out much cleaner. One of life's comprimises.
161		if not results_dir:
162			results_dir = os.path.join(self.resultdir, 'build')
163			i = 2
164			while os.path.exists(results_dir):
165				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
166				i += 1
167		if not os.path.exists(results_dir):
168			os.mkdir(results_dir)
169
170		return (results_dir, tmp_dir)
171
172
173	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
174				kjob = None ):
175		"""Summon a xen object"""
176		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
177		build_dir = 'xen'
178		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
179
180
181	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
182		"""Summon a kernel object"""
183		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
184		build_dir = 'linux'
185		return kernel.auto_kernel(self, base_tree, results_dir,
186					  tmp_dir, build_dir, leave)
187
188
189	def barrier(self, *args):
190		"""Create a barrier object"""
191		return barrier.barrier(*args)
192
193
194	def setup_dep(self, deps):
195		"""Set up the dependencies for this test.
196
197		deps is a list of libraries required for this test.
198		"""
199		for dep in deps:
200			try:
201				os.chdir(os.path.join(self.autodir, 'deps', dep))
202				system('./' + dep + '.py')
203			except:
204				error = "setting up dependency " + dep + "\n"
205				raise UnhandledError(error)
206
207
208	def __runtest(self, url, tag, args, dargs):
209		try:
210			l = lambda : test.runtest(self, url, tag, args, dargs)
211			pid = fork_start(self.resultdir, l)
212			fork_waitfor(self.resultdir, pid)
213		except AutotestError:
214			raise
215		except:
216			raise UnhandledError('running test ' + \
217				self.__class__.__name__ + "\n")
218
219
220	def run_test(self, url, *args, **dargs):
221		"""Summon a test object and run it.
222
223		tag
224			tag to add to testname
225		url
226			url of the test to run
227		"""
228
229		if not url:
230			raise "Test name is invalid. Switched arguments?"
231		(group, testname) = test.testname(url)
232		tag = dargs.pop('tag', None)
233		subdir = testname
234		if tag:
235			subdir += '.' + tag
236
237		def group_func():
238			try:
239				self.__runtest(url, tag, args, dargs)
240			except Exception, detail:
241				self.record('FAIL', subdir, testname,
242					    str(detail))
243				raise
244			else:
245				self.record('GOOD', subdir, testname,
246					    'completed successfully')
247		result, exc_info = self.__rungroup(subdir, group_func)
248
249		if exc_info and isinstance(exc_info[1], TestError):
250			return False
251		elif exc_info:
252			raise exc_info[0], exc_info[1], exc_info[2]
253		else:
254			return True
255
256
257	def __rungroup(self, name, function, *args, **dargs):
258		"""\
259		name:
260		        name of the group
261		function:
262			subroutine to run
263		*args:
264			arguments for the function
265
266		Returns a 2-tuple (result, exc_info) where result
267		is the return value of function, and exc_info is
268		the sys.exc_info() of the exception thrown by the
269		function (which may be None).
270		"""
271
272		result, exc_info = None, None
273		try:
274			self.record('START', None, name)
275			self.group_level += 1
276			result = function(*args, **dargs)
277			self.group_level -= 1
278			self.record('END GOOD', None, name)
279		except Exception, e:
280			exc_info = sys.exc_info()
281			self.group_level -= 1
282			self.record('END FAIL', None, name, format_error())
283
284		return result, exc_info
285
286
287	def run_group(self, function, *args, **dargs):
288		"""\
289		function:
290			subroutine to run
291		*args:
292			arguments for the function
293		"""
294
295		# Allow the tag for the group to be specified
296		name = function.__name__
297		tag = dargs.pop('tag', None)
298		if tag:
299			name = tag
300
301		result, exc_info = self.__rungroup(name, function,
302						   *args, **dargs)
303
304		# if there was a non-TestError exception, raise it
305		if exc_info and isinstance(exc_info[1], TestError):
306			err = ''.join(traceback.format_exception(*exc_info))
307			raise TestError(name + ' failed\n' + err)
308
309		# pass back the actual return value from the function
310		return result
311
312
313	# Check the passed kernel identifier against the command line
314	# and the running kernel, abort the job on missmatch.
315	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
316		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
317			% (expected_when, expected_id, expected_cl, type)
318
319		running_id = running_os_ident()
320
321		cmdline = read_one_line("/proc/cmdline")
322
323		find_sum = re.compile(r'.*IDENT=(\d+)')
324		m = find_sum.match(cmdline)
325		cmdline_when = -1
326		if m:
327			cmdline_when = int(m.groups()[0])
328
329		cl_re = re.compile(r'\d{7,}')
330		cl_match = cl_re.search(system_output('uname -v').split()[1])
331		if cl_match:
332			current_cl = cl_match.group()
333		else:
334			current_cl = None
335
336		# We have all the facts, see if they indicate we
337		# booted the requested kernel or not.
338		bad = False
339		if (type == 'src' and expected_id != running_id or
340		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
341			print "check_kernel_ident: kernel identifier mismatch"
342			bad = True
343		if expected_when != cmdline_when:
344			print "check_kernel_ident: kernel command line mismatch"
345			bad = True
346		if expected_cl and current_cl and str(expected_cl) != current_cl:
347			print 'check_kernel_ident: kernel changelist mismatch'
348			bad = True
349
350		if bad:
351			print "   Expected Ident: " + expected_id
352			print "    Running Ident: " + running_id
353			print "    Expected Mark: %d" % (expected_when)
354			print "Command Line Mark: %d" % (cmdline_when)
355			print "   Expected P4 CL: %s" % expected_cl
356			print "            P4 CL: %s" % current_cl
357			print "     Command Line: " + cmdline
358
359			raise JobError("boot failure", "reboot.verify")
360
361		self.record('GOOD', subdir, 'reboot.verify')
362
363
364	def filesystem(self, device, mountpoint = None, loop_size = 0):
365		if not mountpoint:
366			mountpoint = self.tmpdir
367		return filesystem.filesystem(self, device, mountpoint,loop_size)
368
369
370	def reboot(self, tag='autotest'):
371		self.record('GOOD', None, 'reboot.start')
372		self.harness.run_reboot()
373		default = self.config_get('boot.set_default')
374		if default:
375			self.bootloader.set_default(tag)
376		else:
377			self.bootloader.boot_once(tag)
378		system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
379		self.quit()
380
381
382	def noop(self, text):
383		print "job: noop: " + text
384
385
386	# Job control primatives.
387
388	def __parallel_execute(self, func, *args):
389		func(*args)
390
391
392	def parallel(self, *tasklist):
393		"""Run tasks in parallel"""
394
395		pids = []
396		for task in tasklist:
397			pids.append(fork_start(self.resultdir,
398					lambda: self.__parallel_execute(*task)))
399		for pid in pids:
400			fork_waitfor(self.resultdir, pid)
401
402
403	def quit(self):
404		# XXX: should have a better name.
405		self.harness.run_pause()
406		raise JobContinue("more to come")
407
408
409	def complete(self, status):
410		"""Clean up and exit"""
411		# We are about to exit 'complete' so clean up the control file.
412		try:
413			os.unlink(self.control + '.state')
414		except:
415			pass
416		self.harness.run_complete()
417		sys.exit(status)
418
419
420	steps = []
421	def next_step(self, step):
422		"""Define the next step"""
423		if not isinstance(step[0], basestring):
424			step[0] = step[0].__name__
425		self.steps.append(step)
426		pickle.dump(self.steps, open(self.control + '.state', 'w'))
427
428
429	def next_step_prepend(self, step):
430		"""Insert a new step, executing first"""
431		if not isinstance(step[0], basestring):
432			step[0] = step[0].__name__
433		self.steps.insert(0, step)
434		pickle.dump(self.steps, open(self.control + '.state', 'w'))
435
436
437	def step_engine(self):
438		"""the stepping engine -- if the control file defines
439		step_init we will be using this engine to drive multiple runs.
440		"""
441		"""Do the next step"""
442		lcl = dict({'job': self})
443
444		str = """
445from error import *
446from autotest_utils import *
447"""
448		exec(str, lcl, lcl)
449		execfile(self.control, lcl, lcl)
450
451		state = self.control + '.state'
452		# If there is a mid-job state file load that in and continue
453		# where it indicates.  Otherwise start stepping at the passed
454		# entry.
455		try:
456			self.steps = pickle.load(open(state, 'r'))
457		except:
458			if lcl.has_key('step_init'):
459				self.next_step([lcl['step_init']])
460
461		# Run the step list.
462		while len(self.steps) > 0:
463			step = self.steps.pop(0)
464			pickle.dump(self.steps, open(state, 'w'))
465
466			cmd = step.pop(0)
467			lcl['__args'] = step
468			exec(cmd + "(*__args)", lcl, lcl)
469
470
471	def record(self, status_code, subdir, operation, status = ''):
472		"""
473		Record job-level status
474
475		The intent is to make this file both machine parseable and
476		human readable. That involves a little more complexity, but
477		really isn't all that bad ;-)
478
479		Format is <status code>\t<subdir>\t<operation>\t<status>
480
481		status code: (GOOD|WARN|FAIL|ABORT)
482			or   START
483			or   END (GOOD|WARN|FAIL|ABORT)
484
485		subdir: MUST be a relevant subdirectory in the results,
486		or None, which will be represented as '----'
487
488		operation: description of what you ran (e.g. "dbench", or
489						"mkfs -t foobar /dev/sda9")
490
491		status: error message or "completed sucessfully"
492
493		------------------------------------------------------------
494
495		Initial tabs indicate indent levels for grouping, and is
496		governed by self.group_level
497
498		multiline messages have secondary lines prefaced by a double
499		space ('  ')
500		"""
501
502		if subdir:
503			if re.match(r'[\n\t]', subdir):
504				raise "Invalid character in subdir string"
505			substr = subdir
506		else:
507			substr = '----'
508
509		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
510								status_code):
511			raise "Invalid status code supplied: %s" % status_code
512		if not operation:
513			operation = '----'
514		if re.match(r'[\n\t]', operation):
515			raise "Invalid character in operation string"
516		operation = operation.rstrip()
517		status = status.rstrip()
518		status = re.sub(r"\t", "  ", status)
519		# Ensure any continuation lines are marked so we can
520		# detect them in the status file to ensure it is parsable.
521		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
522
523		# Generate timestamps for inclusion in the logs
524		epoch_time = int(time.time())  # seconds since epoch, in UTC
525		local_time = time.localtime(epoch_time)
526		epoch_time_str = "timestamp=%d" % (epoch_time,)
527		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
528					       local_time)
529
530		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
531						 epoch_time_str, local_time_str,
532						 status))
533		msg = '\t' * self.group_level + msg
534
535		self.harness.test_status_detail(status_code, substr,
536							operation, status)
537		self.harness.test_status(msg)
538		print msg
539		status_file = os.path.join(self.resultdir, 'status')
540		open(status_file, "a").write(msg + "\n")
541		if subdir:
542			status_file = os.path.join(self.resultdir, subdir, 'status')
543			open(status_file, "a").write(msg + "\n")
544
545
546def runjob(control, cont = False, tag = "default", harness_type = ''):
547	"""The main interface to this module
548
549	control
550		The control file to use for this job.
551	cont
552		Whether this is the continuation of a previously started job
553	"""
554	control = os.path.abspath(control)
555	state = control + '.state'
556
557	# instantiate the job object ready for the control file.
558	myjob = None
559	try:
560		# Check that the control file is valid
561		if not os.path.exists(control):
562			raise JobError(control + ": control file not found")
563
564		# When continuing, the job is complete when there is no
565		# state file, ensure we don't try and continue.
566		if cont and not os.path.exists(state):
567			sys.exit(1)
568		if cont == False and os.path.exists(state):
569			os.unlink(state)
570
571		myjob = job(control, tag, cont, harness_type)
572
573		# Load in the users control file, may do any one of:
574		#  1) execute in toto
575		#  2) define steps, and select the first via next_step()
576		myjob.step_engine()
577
578	except JobContinue:
579		sys.exit(5)
580
581	except JobError, instance:
582		print "JOB ERROR: " + instance.args[0]
583		if myjob:
584			command = None
585			if len(instance.args) > 1:
586				command = instance.args[1]
587			myjob.group_level = 0
588			myjob.record('ABORT', None, command, instance.args[0])
589			myjob.record('END ABORT', None, None)
590			myjob.complete(1)
591
592	except Exception, e:
593		msg = format_error()
594		print "JOB ERROR: " + msg
595		if myjob:
596			myjob.group_level = 0
597			myjob.record('ABORT', None, None, msg)
598			myjob.record('END ABORT', None, None)
599			myjob.complete(1)
600
601	# If we get here, then we assume the job is complete and good.
602	myjob.group_level = 0
603	myjob.record('END GOOD', None, None)
604	myjob.complete(0)
605