job.py revision 6b504ff04e21af766da0cfbcde8e1e352a474dfa
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from common.error import *
14from common import barrier
15import kernel, xen, test, profilers, filesystem, fd_stack, boottool
16import harness, config
17import sysinfo
18import cpuset
19
20class job:
21	"""The actual job against which we do everything.
22
23	Properties:
24		autodir
25			The top level autotest directory (/usr/local/autotest).
26			Comes from os.environ['AUTODIR'].
27		bindir
28			<autodir>/bin/
29		testdir
30			<autodir>/tests/
31		profdir
32			<autodir>/profilers/
33		tmpdir
34			<autodir>/tmp/
35		resultdir
36			<autodir>/results/<jobtag>
37		stdout
38			fd_stack object for stdout
39		stderr
40			fd_stack object for stderr
41		profilers
42			the profilers object for this job
43		harness
44			the server harness object for this job
45		config
46			the job configuration for this job
47	"""
48
49	def __init__(self, control, jobtag, cont, harness_type=None):
50		"""
51			control
52				The control file (pathname of)
53			jobtag
54				The job tag string (eg "default")
55			cont
56				If this is the continuation of this job
57			harness_type
58				An alternative server harness
59		"""
60		self.autodir = os.environ['AUTODIR']
61		self.bindir = os.path.join(self.autodir, 'bin')
62		self.testdir = os.path.join(self.autodir, 'tests')
63		self.profdir = os.path.join(self.autodir, 'profilers')
64		self.tmpdir = os.path.join(self.autodir, 'tmp')
65		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
66		self.control = os.path.abspath(control)
67
68		if not cont:
69			if os.path.exists(self.tmpdir):
70				system('umount -f %s > /dev/null 2> /dev/null'%\
71					 	self.tmpdir, ignorestatus=True)
72				system('rm -rf ' + self.tmpdir)
73			os.mkdir(self.tmpdir)
74
75			results = os.path.join(self.autodir, 'results')
76			if not os.path.exists(results):
77				os.mkdir(results)
78
79			download = os.path.join(self.testdir, 'download')
80			if os.path.exists(download):
81				system('rm -rf ' + download)
82			os.mkdir(download)
83
84			if os.path.exists(self.resultdir):
85				system('rm -rf ' + self.resultdir)
86			os.mkdir(self.resultdir)
87
88			os.mkdir(os.path.join(self.resultdir, 'debug'))
89			os.mkdir(os.path.join(self.resultdir, 'analysis'))
90			os.mkdir(os.path.join(self.resultdir, 'sysinfo'))
91
92			shutil.copyfile(self.control,
93					os.path.join(self.resultdir, 'control'))
94
95		self.control = control
96		self.jobtag = jobtag
97
98		self.stdout = fd_stack.fd_stack(1, sys.stdout)
99		self.stderr = fd_stack.fd_stack(2, sys.stderr)
100		self.group_level = 0
101
102		self.config = config.config(self)
103
104		self.harness = harness.select(harness_type, self)
105
106		self.profilers = profilers.profilers(self)
107
108		try:
109			tool = self.config_get('boottool.executable')
110			self.bootloader = boottool.boottool(tool)
111		except:
112			pass
113
114		# log "before each step" sysinfo
115		pwd = os.getcwd()
116		try:
117			os.chdir(os.path.join(self.resultdir, 'sysinfo'))
118			sysinfo.before_each_step()
119		finally:
120			os.chdir(pwd)
121
122		if not cont:
123			self.record('START', None, None)
124		self.group_level = 1
125
126		self.harness.run_start()
127
128
129	def relative_path(self, path):
130		"""\
131		Return a patch relative to the job results directory
132		"""
133		head = len(self.resultdir) + 1     # remove the / inbetween
134		return path[head:]
135
136
137	def control_get(self):
138		return self.control
139
140
141	def control_set(self, control):
142		self.control = os.path.abspath(control)
143
144
145	def harness_select(self, which):
146		self.harness = harness.select(which, self)
147
148
149	def config_set(self, name, value):
150		self.config.set(name, value)
151
152
153	def config_get(self, name):
154		return self.config.get(name)
155
156	def setup_dirs(self, results_dir, tmp_dir):
157		if not tmp_dir:
158			tmp_dir = os.path.join(self.tmpdir, 'build')
159		if not os.path.exists(tmp_dir):
160			os.mkdir(tmp_dir)
161		if not os.path.isdir(tmp_dir):
162			raise "Temp dir (%s) is not a dir - args backwards?" \
163								% self.tmpdir
164
165		# We label the first build "build" and then subsequent ones
166		# as "build.2", "build.3", etc. Whilst this is a little bit
167		# inconsistent, 99.9% of jobs will only have one build
168		# (that's not done as kernbench, sparse, or buildtest),
169		# so it works out much cleaner. One of life's comprimises.
170		if not results_dir:
171			results_dir = os.path.join(self.resultdir, 'build')
172			i = 2
173			while os.path.exists(results_dir):
174				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
175				i += 1
176		if not os.path.exists(results_dir):
177			os.mkdir(results_dir)
178
179		return (results_dir, tmp_dir)
180
181
182	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
183				kjob = None ):
184		"""Summon a xen object"""
185		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
186		build_dir = 'xen'
187		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
188
189
190	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
191		"""Summon a kernel object"""
192		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
193		build_dir = 'linux'
194		return kernel.auto_kernel(self, base_tree, results_dir,
195					  tmp_dir, build_dir, leave)
196
197
198	def barrier(self, *args, **kwds):
199		"""Create a barrier object"""
200		return barrier.barrier(*args, **kwds)
201
202
203	def setup_dep(self, deps):
204		"""Set up the dependencies for this test.
205
206		deps is a list of libraries required for this test.
207		"""
208		for dep in deps:
209			try:
210				os.chdir(os.path.join(self.autodir, 'deps', dep))
211				system('./' + dep + '.py')
212			except:
213				error = "setting up dependency " + dep + "\n"
214				raise UnhandledError(error)
215
216
217	def __runtest(self, url, tag, args, dargs):
218		try:
219			l = lambda : test.runtest(self, url, tag, args, dargs)
220			pid = fork_start(self.resultdir, l)
221			fork_waitfor(self.resultdir, pid)
222		except AutotestError:
223			raise
224		except:
225			raise UnhandledError('running test ' + \
226				self.__class__.__name__ + "\n")
227
228
229	def run_test(self, url, *args, **dargs):
230		"""Summon a test object and run it.
231
232		tag
233			tag to add to testname
234		url
235			url of the test to run
236		"""
237
238		if not url:
239			raise "Test name is invalid. Switched arguments?"
240		(group, testname) = test.testname(url)
241		tag = dargs.pop('tag', None)
242		self.container = None
243		container = dargs.pop('container', None)
244		subdir = testname
245		if tag:
246			subdir += '.' + tag
247
248		if container:
249			container_name = container.pop('container_name', None)
250			cpu = container.get('cpu', None)
251			root_container = container.get('root', 'sys')
252			if not container_name:
253				container_name = testname
254			if not grep('cpusets', '/proc/filesystems'):
255
256				self.container = cpuset.cpuset(container_name,
257				    container['mem'],
258				    os.getpid(),
259				    root = root_container,
260				    cpus = cpu)
261			# We are running in a container now...
262
263		def group_func():
264			try:
265				self.__runtest(url, tag, args, dargs)
266			except Exception, detail:
267				self.record('FAIL', subdir, testname,
268					    str(detail))
269				raise
270			else:
271				self.record('GOOD', subdir, testname,
272					    'completed successfully')
273		result, exc_info = self.__rungroup(subdir, group_func)
274		if self.container:
275			self.container.release()
276			self.container = None
277
278		if exc_info and isinstance(exc_info[1], TestError):
279			return False
280		elif exc_info:
281			raise exc_info[0], exc_info[1], exc_info[2]
282		else:
283			return True
284
285
286	def __rungroup(self, name, function, *args, **dargs):
287		"""\
288		name:
289		        name of the group
290		function:
291			subroutine to run
292		*args:
293			arguments for the function
294
295		Returns a 2-tuple (result, exc_info) where result
296		is the return value of function, and exc_info is
297		the sys.exc_info() of the exception thrown by the
298		function (which may be None).
299		"""
300
301		result, exc_info = None, None
302		try:
303			self.record('START', None, name)
304			self.group_level += 1
305			result = function(*args, **dargs)
306			self.group_level -= 1
307			self.record('END GOOD', None, name)
308		except Exception, e:
309			exc_info = sys.exc_info()
310			self.group_level -= 1
311			err_msg = str(e) + '\n' + format_error()
312			self.record('END FAIL', None, name, err_msg)
313
314		return result, exc_info
315
316
317	def run_group(self, function, *args, **dargs):
318		"""\
319		function:
320			subroutine to run
321		*args:
322			arguments for the function
323		"""
324
325		# Allow the tag for the group to be specified
326		name = function.__name__
327		tag = dargs.pop('tag', None)
328		if tag:
329			name = tag
330
331		result, exc_info = self.__rungroup(name, function,
332						   *args, **dargs)
333
334		# if there was a non-TestError exception, raise it
335		if exc_info and isinstance(exc_info[1], TestError):
336			err = ''.join(traceback.format_exception(*exc_info))
337			raise TestError(name + ' failed\n' + err)
338
339		# pass back the actual return value from the function
340		return result
341
342
343	# Check the passed kernel identifier against the command line
344	# and the running kernel, abort the job on missmatch.
345	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
346		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
347			% (expected_when, expected_id, expected_cl, type)
348
349		running_id = running_os_ident()
350
351		cmdline = read_one_line("/proc/cmdline")
352
353		find_sum = re.compile(r'.*IDENT=(\d+)')
354		m = find_sum.match(cmdline)
355		cmdline_when = -1
356		if m:
357			cmdline_when = int(m.groups()[0])
358
359		cl_re = re.compile(r'\d{7,}')
360		cl_match = cl_re.search(system_output('uname -v').split()[1])
361		if cl_match:
362			current_cl = cl_match.group()
363		else:
364			current_cl = None
365
366		# We have all the facts, see if they indicate we
367		# booted the requested kernel or not.
368		bad = False
369		if (type == 'src' and expected_id != running_id or
370		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
371			print "check_kernel_ident: kernel identifier mismatch"
372			bad = True
373		if expected_when != cmdline_when:
374			print "check_kernel_ident: kernel command line mismatch"
375			bad = True
376		if expected_cl and current_cl and str(expected_cl) != current_cl:
377			print 'check_kernel_ident: kernel changelist mismatch'
378			bad = True
379
380		if bad:
381			print "   Expected Ident: " + expected_id
382			print "    Running Ident: " + running_id
383			print "    Expected Mark: %d" % (expected_when)
384			print "Command Line Mark: %d" % (cmdline_when)
385			print "   Expected P4 CL: %s" % expected_cl
386			print "            P4 CL: %s" % current_cl
387			print "     Command Line: " + cmdline
388
389			raise JobError("boot failure", "reboot.verify")
390
391		self.record('GOOD', subdir, 'reboot.verify')
392
393
394	def filesystem(self, device, mountpoint = None, loop_size = 0):
395		if not mountpoint:
396			mountpoint = self.tmpdir
397		return filesystem.filesystem(self, device, mountpoint,loop_size)
398
399
400	def reboot(self, tag='autotest'):
401		self.record('GOOD', None, 'reboot.start')
402		self.harness.run_reboot()
403		default = self.config_get('boot.set_default')
404		if default:
405			self.bootloader.set_default(tag)
406		else:
407			self.bootloader.boot_once(tag)
408		system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
409		self.quit()
410
411
412	def noop(self, text):
413		print "job: noop: " + text
414
415
416	# Job control primatives.
417
418	def __parallel_execute(self, func, *args):
419		func(*args)
420
421
422	def parallel(self, *tasklist):
423		"""Run tasks in parallel"""
424
425		pids = []
426		for task in tasklist:
427			pids.append(fork_start(self.resultdir,
428					lambda: self.__parallel_execute(*task)))
429		for pid in pids:
430			fork_waitfor(self.resultdir, pid)
431
432
433	def quit(self):
434		# XXX: should have a better name.
435		self.harness.run_pause()
436		raise JobContinue("more to come")
437
438
439	def complete(self, status):
440		"""Clean up and exit"""
441		# We are about to exit 'complete' so clean up the control file.
442		try:
443			os.unlink(self.control + '.state')
444		except:
445			pass
446		self.harness.run_complete()
447		sys.exit(status)
448
449
450	steps = []
451	def next_step(self, step):
452		"""Define the next step"""
453		if not isinstance(step[0], basestring):
454			step[0] = step[0].__name__
455		self.steps.append(step)
456		pickle.dump(self.steps, open(self.control + '.state', 'w'))
457
458
459	def next_step_prepend(self, step):
460		"""Insert a new step, executing first"""
461		if not isinstance(step[0], basestring):
462			step[0] = step[0].__name__
463		self.steps.insert(0, step)
464		pickle.dump(self.steps, open(self.control + '.state', 'w'))
465
466
467	def step_engine(self):
468		"""the stepping engine -- if the control file defines
469		step_init we will be using this engine to drive multiple runs.
470		"""
471		"""Do the next step"""
472		lcl = dict({'job': self})
473
474		str = """
475from common.error import *
476from autotest_utils import *
477"""
478		exec(str, lcl, lcl)
479		execfile(self.control, lcl, lcl)
480
481		state = self.control + '.state'
482		# If there is a mid-job state file load that in and continue
483		# where it indicates.  Otherwise start stepping at the passed
484		# entry.
485		try:
486			self.steps = pickle.load(open(state, 'r'))
487		except:
488			if lcl.has_key('step_init'):
489				self.next_step([lcl['step_init']])
490
491		# Run the step list.
492		while len(self.steps) > 0:
493			step = self.steps.pop(0)
494			pickle.dump(self.steps, open(state, 'w'))
495
496			cmd = step.pop(0)
497			lcl['__args'] = step
498			exec(cmd + "(*__args)", lcl, lcl)
499
500
501	def record(self, status_code, subdir, operation, status = ''):
502		"""
503		Record job-level status
504
505		The intent is to make this file both machine parseable and
506		human readable. That involves a little more complexity, but
507		really isn't all that bad ;-)
508
509		Format is <status code>\t<subdir>\t<operation>\t<status>
510
511		status code: (GOOD|WARN|FAIL|ABORT)
512			or   START
513			or   END (GOOD|WARN|FAIL|ABORT)
514
515		subdir: MUST be a relevant subdirectory in the results,
516		or None, which will be represented as '----'
517
518		operation: description of what you ran (e.g. "dbench", or
519						"mkfs -t foobar /dev/sda9")
520
521		status: error message or "completed sucessfully"
522
523		------------------------------------------------------------
524
525		Initial tabs indicate indent levels for grouping, and is
526		governed by self.group_level
527
528		multiline messages have secondary lines prefaced by a double
529		space ('  ')
530		"""
531
532		if subdir:
533			if re.match(r'[\n\t]', subdir):
534				raise "Invalid character in subdir string"
535			substr = subdir
536		else:
537			substr = '----'
538
539		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
540								status_code):
541			raise "Invalid status code supplied: %s" % status_code
542		if not operation:
543			operation = '----'
544		if re.match(r'[\n\t]', operation):
545			raise "Invalid character in operation string"
546		operation = operation.rstrip()
547		status = status.rstrip()
548		status = re.sub(r"\t", "  ", status)
549		# Ensure any continuation lines are marked so we can
550		# detect them in the status file to ensure it is parsable.
551		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
552
553		# Generate timestamps for inclusion in the logs
554		epoch_time = int(time.time())  # seconds since epoch, in UTC
555		local_time = time.localtime(epoch_time)
556		epoch_time_str = "timestamp=%d" % (epoch_time,)
557		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
558					       local_time)
559
560		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
561						 epoch_time_str, local_time_str,
562						 status))
563		msg = '\t' * self.group_level + msg
564
565		self.harness.test_status_detail(status_code, substr,
566							operation, status)
567		self.harness.test_status(msg)
568		print msg
569		status_file = os.path.join(self.resultdir, 'status')
570		open(status_file, "a").write(msg + "\n")
571		if subdir:
572			status_file = os.path.join(self.resultdir, subdir, 'status')
573			open(status_file, "a").write(msg + "\n")
574
575
576def runjob(control, cont = False, tag = "default", harness_type = ''):
577	"""The main interface to this module
578
579	control
580		The control file to use for this job.
581	cont
582		Whether this is the continuation of a previously started job
583	"""
584	control = os.path.abspath(control)
585	state = control + '.state'
586
587	# instantiate the job object ready for the control file.
588	myjob = None
589	try:
590		# Check that the control file is valid
591		if not os.path.exists(control):
592			raise JobError(control + ": control file not found")
593
594		# When continuing, the job is complete when there is no
595		# state file, ensure we don't try and continue.
596		if cont and not os.path.exists(state):
597			raise JobComplete("all done")
598		if cont == False and os.path.exists(state):
599			os.unlink(state)
600
601		myjob = job(control, tag, cont, harness_type)
602
603		# Load in the users control file, may do any one of:
604		#  1) execute in toto
605		#  2) define steps, and select the first via next_step()
606		myjob.step_engine()
607
608	except JobContinue:
609		sys.exit(5)
610
611	except JobComplete:
612		sys.exit(1)
613
614	except JobError, instance:
615		print "JOB ERROR: " + instance.args[0]
616		if myjob:
617			command = None
618			if len(instance.args) > 1:
619				command = instance.args[1]
620			myjob.group_level = 0
621			myjob.record('ABORT', None, command, instance.args[0])
622			myjob.record('END ABORT', None, None)
623			myjob.complete(1)
624		else:
625			sys.exit(1)
626
627	except Exception, e:
628		msg = str(e) + '\n' + format_error()
629		print "JOB ERROR: " + msg
630		if myjob:
631			myjob.group_level = 0
632			myjob.record('ABORT', None, None, msg)
633			myjob.record('END ABORT', None, None)
634			myjob.complete(1)
635		else:
636			sys.exit(1)
637
638	# If we get here, then we assume the job is complete and good.
639	myjob.group_level = 0
640	myjob.record('END GOOD', None, None)
641	myjob.complete(0)
642