job.py revision adff6ca0cb1fd0eb9d715fb45ec33708a02579ba
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback
10# autotest stuff
11from autotest_utils import *
12from parallel import *
13from common.error import *
14from common import barrier
15import kernel, xen, test, profilers, filesystem, fd_stack, boottool
16import harness, config
17import sysinfo
18import cpuset
19
20class job:
21	"""The actual job against which we do everything.
22
23	Properties:
24		autodir
25			The top level autotest directory (/usr/local/autotest).
26			Comes from os.environ['AUTODIR'].
27		bindir
28			<autodir>/bin/
29		testdir
30			<autodir>/tests/
31		profdir
32			<autodir>/profilers/
33		tmpdir
34			<autodir>/tmp/
35		resultdir
36			<autodir>/results/<jobtag>
37		stdout
38			fd_stack object for stdout
39		stderr
40			fd_stack object for stderr
41		profilers
42			the profilers object for this job
43		harness
44			the server harness object for this job
45		config
46			the job configuration for this job
47	"""
48
49	DEFAULT_LOG_FILENAME = "status"
50
51	def __init__(self, control, jobtag, cont, harness_type=None):
52		"""
53			control
54				The control file (pathname of)
55			jobtag
56				The job tag string (eg "default")
57			cont
58				If this is the continuation of this job
59			harness_type
60				An alternative server harness
61		"""
62		self.autodir = os.environ['AUTODIR']
63		self.bindir = os.path.join(self.autodir, 'bin')
64		self.testdir = os.path.join(self.autodir, 'tests')
65		self.profdir = os.path.join(self.autodir, 'profilers')
66		self.tmpdir = os.path.join(self.autodir, 'tmp')
67		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
68		self.sysinfodir = os.path.join(self.resultdir, 'sysinfo')
69		self.control = os.path.abspath(control)
70
71		if not cont:
72			if os.path.exists(self.tmpdir):
73				system('umount -f %s > /dev/null 2> /dev/null'%\
74					 	self.tmpdir, ignorestatus=True)
75				system('rm -rf ' + self.tmpdir)
76			os.mkdir(self.tmpdir)
77
78			results = os.path.join(self.autodir, 'results')
79			if not os.path.exists(results):
80				os.mkdir(results)
81
82			download = os.path.join(self.testdir, 'download')
83			if os.path.exists(download):
84				system('rm -rf ' + download)
85			os.mkdir(download)
86
87			if os.path.exists(self.resultdir):
88				system('rm -rf ' + self.resultdir)
89			os.mkdir(self.resultdir)
90			os.mkdir(self.sysinfodir)
91
92			os.mkdir(os.path.join(self.resultdir, 'debug'))
93			os.mkdir(os.path.join(self.resultdir, 'analysis'))
94
95			shutil.copyfile(self.control,
96					os.path.join(self.resultdir, 'control'))
97
98		self.control = control
99		self.jobtag = jobtag
100		self.log_filename = self.DEFAULT_LOG_FILENAME
101
102		self.stdout = fd_stack.fd_stack(1, sys.stdout)
103		self.stderr = fd_stack.fd_stack(2, sys.stderr)
104		self.group_level = 0
105
106		self.config = config.config(self)
107
108		self.harness = harness.select(harness_type, self)
109
110		self.profilers = profilers.profilers(self)
111
112		try:
113			tool = self.config_get('boottool.executable')
114			self.bootloader = boottool.boottool(tool)
115		except:
116			pass
117
118		sysinfo.log_per_reboot_data(self.sysinfodir)
119
120		if not cont:
121			self.record('START', None, None)
122		self.group_level = 1
123
124		self.harness.run_start()
125
126
127	def relative_path(self, path):
128		"""\
129		Return a patch relative to the job results directory
130		"""
131		head = len(self.resultdir) + 1     # remove the / inbetween
132		return path[head:]
133
134
135	def control_get(self):
136		return self.control
137
138
139	def control_set(self, control):
140		self.control = os.path.abspath(control)
141
142
143	def harness_select(self, which):
144		self.harness = harness.select(which, self)
145
146
147	def config_set(self, name, value):
148		self.config.set(name, value)
149
150
151	def config_get(self, name):
152		return self.config.get(name)
153
154	def setup_dirs(self, results_dir, tmp_dir):
155		if not tmp_dir:
156			tmp_dir = os.path.join(self.tmpdir, 'build')
157		if not os.path.exists(tmp_dir):
158			os.mkdir(tmp_dir)
159		if not os.path.isdir(tmp_dir):
160			e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
161			raise ValueError(e_msg)
162
163		# We label the first build "build" and then subsequent ones
164		# as "build.2", "build.3", etc. Whilst this is a little bit
165		# inconsistent, 99.9% of jobs will only have one build
166		# (that's not done as kernbench, sparse, or buildtest),
167		# so it works out much cleaner. One of life's comprimises.
168		if not results_dir:
169			results_dir = os.path.join(self.resultdir, 'build')
170			i = 2
171			while os.path.exists(results_dir):
172				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
173				i += 1
174		if not os.path.exists(results_dir):
175			os.mkdir(results_dir)
176
177		return (results_dir, tmp_dir)
178
179
180	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
181				kjob = None ):
182		"""Summon a xen object"""
183		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
184		build_dir = 'xen'
185		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
186
187
188	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
189		"""Summon a kernel object"""
190		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
191		build_dir = 'linux'
192		return kernel.auto_kernel(self, base_tree, results_dir,
193					  tmp_dir, build_dir, leave)
194
195
196	def barrier(self, *args, **kwds):
197		"""Create a barrier object"""
198		return barrier.barrier(*args, **kwds)
199
200
201	def setup_dep(self, deps):
202		"""Set up the dependencies for this test.
203
204		deps is a list of libraries required for this test.
205		"""
206		for dep in deps:
207			try:
208				os.chdir(os.path.join(self.autodir, 'deps', dep))
209				system('./' + dep + '.py')
210			except:
211				error = "setting up dependency " + dep + "\n"
212				raise UnhandledError(error)
213
214
215	def __runtest(self, url, tag, args, dargs):
216		try:
217			l = lambda : test.runtest(self, url, tag, args, dargs)
218			pid = fork_start(self.resultdir, l)
219			fork_waitfor(self.resultdir, pid)
220		except AutotestError:
221			raise
222		except:
223			raise UnhandledError('running test ' + \
224				self.__class__.__name__ + "\n")
225
226
227	def run_test(self, url, *args, **dargs):
228		"""Summon a test object and run it.
229
230		tag
231			tag to add to testname
232		url
233			url of the test to run
234		"""
235
236		if not url:
237			raise TypeError("Test name is invalid. Switched arguments?")
238		(group, testname) = test.testname(url)
239		tag = dargs.pop('tag', None)
240		self.container = None
241		container = dargs.pop('container', None)
242		subdir = testname
243		if tag:
244			subdir += '.' + tag
245
246		if container:
247			container_name = container.pop('container_name', None)
248			cpu = container.get('cpu', None)
249			root_container = container.get('root', 'sys')
250			if not container_name:
251				container_name = testname
252			if not grep('cpusets', '/proc/filesystems'):
253
254				self.container = cpuset.cpuset(container_name,
255				    container['mem'],
256				    os.getpid(),
257				    root = root_container,
258				    cpus = cpu)
259			# We are running in a container now...
260
261		def group_func():
262			try:
263				self.__runtest(url, tag, args, dargs)
264			except Exception, detail:
265				self.record('FAIL', subdir, testname,
266					    str(detail))
267				raise
268			else:
269				self.record('GOOD', subdir, testname,
270					    'completed successfully')
271		result, exc_info = self.__rungroup(subdir, group_func)
272		if self.container:
273			self.container.release()
274			self.container = None
275
276		if exc_info and isinstance(exc_info[1], TestError):
277			return False
278		elif exc_info:
279			raise exc_info[0], exc_info[1], exc_info[2]
280		else:
281			return True
282
283
284	def __rungroup(self, name, function, *args, **dargs):
285		"""\
286		name:
287		        name of the group
288		function:
289			subroutine to run
290		*args:
291			arguments for the function
292
293		Returns a 2-tuple (result, exc_info) where result
294		is the return value of function, and exc_info is
295		the sys.exc_info() of the exception thrown by the
296		function (which may be None).
297		"""
298
299		result, exc_info = None, None
300		try:
301			self.record('START', None, name)
302			self.group_level += 1
303			result = function(*args, **dargs)
304			self.group_level -= 1
305			self.record('END GOOD', None, name)
306		except Exception, e:
307			exc_info = sys.exc_info()
308			self.group_level -= 1
309			err_msg = str(e) + '\n' + format_error()
310			self.record('END FAIL', None, name, err_msg)
311
312		return result, exc_info
313
314
315	def run_group(self, function, *args, **dargs):
316		"""\
317		function:
318			subroutine to run
319		*args:
320			arguments for the function
321		"""
322
323		# Allow the tag for the group to be specified
324		name = function.__name__
325		tag = dargs.pop('tag', None)
326		if tag:
327			name = tag
328
329		result, exc_info = self.__rungroup(name, function,
330						   *args, **dargs)
331
332		# if there was a non-TestError exception, raise it
333		if exc_info and not isinstance(exc_info[1], TestError):
334			err = ''.join(traceback.format_exception(*exc_info))
335			raise TestError(name + ' failed\n' + err)
336
337		# pass back the actual return value from the function
338		return result
339
340
341	# Check the passed kernel identifier against the command line
342	# and the running kernel, abort the job on missmatch.
343	def kernel_check_ident(self, expected_when, expected_id, expected_cl, subdir, type = 'src'):
344		print "POST BOOT: checking booted kernel mark=%d identity='%s' changelist=%s type='%s'" \
345			% (expected_when, expected_id, expected_cl, type)
346
347		running_id = running_os_ident()
348
349		cmdline = read_one_line("/proc/cmdline")
350
351		find_sum = re.compile(r'.*IDENT=(\d+)')
352		m = find_sum.match(cmdline)
353		cmdline_when = -1
354		if m:
355			cmdline_when = int(m.groups()[0])
356
357		cl_re = re.compile(r'\d{7,}')
358		cl_match = cl_re.search(system_output('uname -v').split()[1])
359		if cl_match:
360			current_cl = cl_match.group()
361		else:
362			current_cl = None
363
364		# We have all the facts, see if they indicate we
365		# booted the requested kernel or not.
366		bad = False
367		if (type == 'src' and expected_id != running_id or
368		    type == 'rpm' and not running_id.startswith(expected_id + '::')):
369			print "check_kernel_ident: kernel identifier mismatch"
370			bad = True
371		if expected_when != cmdline_when:
372			print "check_kernel_ident: kernel command line mismatch"
373			bad = True
374		if expected_cl and current_cl and str(expected_cl) != current_cl:
375			print 'check_kernel_ident: kernel changelist mismatch'
376			bad = True
377
378		if bad:
379			print "   Expected Ident: " + expected_id
380			print "    Running Ident: " + running_id
381			print "    Expected Mark: %d" % (expected_when)
382			print "Command Line Mark: %d" % (cmdline_when)
383			print "   Expected P4 CL: %s" % expected_cl
384			print "            P4 CL: %s" % current_cl
385			print "     Command Line: " + cmdline
386
387			raise JobError("boot failure", "reboot.verify")
388
389		self.record('GOOD', subdir, 'reboot.verify')
390
391
392	def filesystem(self, device, mountpoint = None, loop_size = 0):
393		if not mountpoint:
394			mountpoint = self.tmpdir
395		return filesystem.filesystem(self, device, mountpoint,loop_size)
396
397
398	def reboot(self, tag='autotest'):
399		self.record('GOOD', None, 'reboot.start')
400		self.harness.run_reboot()
401		default = self.config_get('boot.set_default')
402		if default:
403			self.bootloader.set_default(tag)
404		else:
405			self.bootloader.boot_once(tag)
406		system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")
407		self.quit()
408
409
410	def noop(self, text):
411		print "job: noop: " + text
412
413
414	def parallel(self, *tasklist):
415		"""Run tasks in parallel"""
416
417		pids = []
418		old_log_filename = self.log_filename
419		for i, task in enumerate(tasklist):
420			self.log_filename = old_log_filename + (".%d" % i)
421			task_func = lambda: task[0](*task[1:])
422			pids.append(fork_start(self.resultdir, task_func))
423
424		old_log_path = os.path.join(self.resultdir, old_log_filename)
425		old_log = open(old_log_path, "a")
426		exceptions = []
427		for i, pid in enumerate(pids):
428			# wait for the task to finish
429			try:
430				fork_waitfor(self.resultdir, pid)
431			except Exception, e:
432				exceptions.append(e)
433			# copy the logs from the subtask into the main log
434			new_log_path = old_log_path + (".%d" % i)
435			if os.path.exists(new_log_path):
436				new_log = open(new_log_path)
437				old_log.write(new_log.read())
438				new_log.close()
439				old_log.flush()
440				os.remove(new_log_path)
441		old_log.close()
442
443		self.log_filename = old_log_filename
444
445		# handle any exceptions raised by the parallel tasks
446		if exceptions:
447			msg = "%d task(s) failed" % len(exceptions)
448			raise JobError(msg, str(exceptions), exceptions)
449
450
451	def quit(self):
452		# XXX: should have a better name.
453		self.harness.run_pause()
454		raise JobContinue("more to come")
455
456
457	def complete(self, status):
458		"""Clean up and exit"""
459		# We are about to exit 'complete' so clean up the control file.
460		try:
461			os.unlink(self.control + '.state')
462		except:
463			pass
464		self.harness.run_complete()
465		sys.exit(status)
466
467
468	steps = []
469	def next_step(self, step):
470		"""Define the next step"""
471		if not isinstance(step[0], basestring):
472			step[0] = step[0].__name__
473		self.steps.append(step)
474		pickle.dump(self.steps, open(self.control + '.state', 'w'))
475
476
477	def next_step_prepend(self, step):
478		"""Insert a new step, executing first"""
479		if not isinstance(step[0], basestring):
480			step[0] = step[0].__name__
481		self.steps.insert(0, step)
482		pickle.dump(self.steps, open(self.control + '.state', 'w'))
483
484
485	def step_engine(self):
486		"""the stepping engine -- if the control file defines
487		step_init we will be using this engine to drive multiple runs.
488		"""
489		"""Do the next step"""
490		lcl = dict({'job': self})
491
492		str = """
493from common.error import *
494from autotest_utils import *
495"""
496		exec(str, lcl, lcl)
497		execfile(self.control, lcl, lcl)
498
499		state = self.control + '.state'
500		# If there is a mid-job state file load that in and continue
501		# where it indicates.  Otherwise start stepping at the passed
502		# entry.
503		try:
504			self.steps = pickle.load(open(state, 'r'))
505		except:
506			if lcl.has_key('step_init'):
507				self.next_step([lcl['step_init']])
508
509		# Run the step list.
510		while len(self.steps) > 0:
511			step = self.steps.pop(0)
512			pickle.dump(self.steps, open(state, 'w'))
513
514			cmd = step.pop(0)
515			lcl['__args'] = step
516			exec(cmd + "(*__args)", lcl, lcl)
517
518
519	def record(self, status_code, subdir, operation, status = ''):
520		"""
521		Record job-level status
522
523		The intent is to make this file both machine parseable and
524		human readable. That involves a little more complexity, but
525		really isn't all that bad ;-)
526
527		Format is <status code>\t<subdir>\t<operation>\t<status>
528
529		status code: (GOOD|WARN|FAIL|ABORT)
530			or   START
531			or   END (GOOD|WARN|FAIL|ABORT)
532
533		subdir: MUST be a relevant subdirectory in the results,
534		or None, which will be represented as '----'
535
536		operation: description of what you ran (e.g. "dbench", or
537						"mkfs -t foobar /dev/sda9")
538
539		status: error message or "completed sucessfully"
540
541		------------------------------------------------------------
542
543		Initial tabs indicate indent levels for grouping, and is
544		governed by self.group_level
545
546		multiline messages have secondary lines prefaced by a double
547		space ('  ')
548		"""
549
550		if subdir:
551			if re.match(r'[\n\t]', subdir):
552				raise ValueError("Invalid character in subdir string")
553			substr = subdir
554		else:
555			substr = '----'
556
557		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
558								status_code):
559			raise ValueError("Invalid status code supplied: %s" % status_code)
560		if not operation:
561			operation = '----'
562		if re.match(r'[\n\t]', operation):
563			raise ValueError("Invalid character in operation string")
564		operation = operation.rstrip()
565		status = status.rstrip()
566		status = re.sub(r"\t", "  ", status)
567		# Ensure any continuation lines are marked so we can
568		# detect them in the status file to ensure it is parsable.
569		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ", status)
570
571		# Generate timestamps for inclusion in the logs
572		epoch_time = int(time.time())  # seconds since epoch, in UTC
573		local_time = time.localtime(epoch_time)
574		epoch_time_str = "timestamp=%d" % (epoch_time,)
575		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
576					       local_time)
577
578		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
579						 epoch_time_str, local_time_str,
580						 status))
581		msg = '\t' * self.group_level + msg
582
583		msg_tag = ""
584		if "." in self.log_filename:
585			msg_tag = self.log_filename.split(".", 1)[1]
586
587		self.harness.test_status_detail(status_code, substr, operation,
588						status, msg_tag)
589		self.harness.test_status(msg, msg_tag)
590
591		# log to stdout (if enabled)
592		#if self.log_filename == self.DEFAULT_LOG_FILENAME:
593		print msg
594
595		# log to the "root" status log
596		status_file = os.path.join(self.resultdir, self.log_filename)
597		open(status_file, "a").write(msg + "\n")
598
599		# log to the subdir status log (if subdir is set)
600		if subdir:
601			dir = os.path.join(self.resultdir, subdir)
602			if not os.path.exists(dir):
603				os.mkdir(dir)
604
605			status_file = os.path.join(dir,
606						   self.DEFAULT_LOG_FILENAME)
607			open(status_file, "a").write(msg + "\n")
608
609
610def runjob(control, cont = False, tag = "default", harness_type = ''):
611	"""The main interface to this module
612
613	control
614		The control file to use for this job.
615	cont
616		Whether this is the continuation of a previously started job
617	"""
618	control = os.path.abspath(control)
619	state = control + '.state'
620
621	# instantiate the job object ready for the control file.
622	myjob = None
623	try:
624		# Check that the control file is valid
625		if not os.path.exists(control):
626			raise JobError(control + ": control file not found")
627
628		# When continuing, the job is complete when there is no
629		# state file, ensure we don't try and continue.
630		if cont and not os.path.exists(state):
631			raise JobComplete("all done")
632		if cont == False and os.path.exists(state):
633			os.unlink(state)
634
635		myjob = job(control, tag, cont, harness_type)
636
637		# Load in the users control file, may do any one of:
638		#  1) execute in toto
639		#  2) define steps, and select the first via next_step()
640		myjob.step_engine()
641
642	except JobContinue:
643		sys.exit(5)
644
645	except JobComplete:
646		sys.exit(1)
647
648	except JobError, instance:
649		print "JOB ERROR: " + instance.args[0]
650		if myjob:
651			command = None
652			if len(instance.args) > 1:
653				command = instance.args[1]
654			myjob.group_level = 0
655			myjob.record('ABORT', None, command, instance.args[0])
656			myjob.record('END ABORT', None, None)
657			myjob.complete(1)
658		else:
659			sys.exit(1)
660
661	except Exception, e:
662		msg = str(e) + '\n' + format_error()
663		print "JOB ERROR: " + msg
664		if myjob:
665			myjob.group_level = 0
666			myjob.record('ABORT', None, None, msg)
667			myjob.record('END ABORT', None, None)
668			myjob.complete(1)
669		else:
670			sys.exit(1)
671
672	# If we get here, then we assume the job is complete and good.
673	myjob.group_level = 0
674	myjob.record('END GOOD', None, None)
675	myjob.complete(0)
676