job.py revision 87cbc7fe4f0fef49c74c9647cc42c50088622449
1"""The main job wrapper
2
3This is the core infrastructure.
4"""
5
6__author__ = """Copyright Andy Whitcroft, Martin J. Bligh 2006"""
7
8# standard stuff
9import os, sys, re, pickle, shutil, time, traceback, types, copy
10
11# autotest stuff
12from autotest_lib.client.bin import autotest_utils
13from autotest_lib.client.common_lib import error, barrier, logging
14
15import parallel, kernel, xen, test, profilers, filesystem, fd_stack, boottool
16import harness, config, sysinfo, cpuset
17
18
19
20JOB_PREAMBLE = """
21from common.error import *
22from autotest_utils import *
23"""
24
25
26class StepError(error.AutotestError):
27	pass
28
29
30class base_job:
31	"""The actual job against which we do everything.
32
33	Properties:
34		autodir
35			The top level autotest directory (/usr/local/autotest).
36			Comes from os.environ['AUTODIR'].
37		bindir
38			<autodir>/bin/
39		libdir
40			<autodir>/lib/
41		testdir
42			<autodir>/tests/
43		site_testdir
44			<autodir>/site_tests/
45		profdir
46			<autodir>/profilers/
47		tmpdir
48			<autodir>/tmp/
49		resultdir
50			<autodir>/results/<jobtag>
51		stdout
52			fd_stack object for stdout
53		stderr
54			fd_stack object for stderr
55		profilers
56			the profilers object for this job
57		harness
58			the server harness object for this job
59		config
60			the job configuration for this job
61	"""
62
63	DEFAULT_LOG_FILENAME = "status"
64
65	def __init__(self, control, jobtag, cont, harness_type=None,
66			use_external_logging = False):
67		"""
68			control
69				The control file (pathname of)
70			jobtag
71				The job tag string (eg "default")
72			cont
73				If this is the continuation of this job
74			harness_type
75				An alternative server harness
76		"""
77		self.autodir = os.environ['AUTODIR']
78		self.bindir = os.path.join(self.autodir, 'bin')
79		self.libdir = os.path.join(self.autodir, 'lib')
80		self.testdir = os.path.join(self.autodir, 'tests')
81		self.site_testdir = os.path.join(self.autodir, 'site_tests')
82		self.profdir = os.path.join(self.autodir, 'profilers')
83		self.tmpdir = os.path.join(self.autodir, 'tmp')
84		self.resultdir = os.path.join(self.autodir, 'results', jobtag)
85		self.sysinfodir = os.path.join(self.resultdir, 'sysinfo')
86		self.control = os.path.abspath(control)
87		self.state_file = self.control + '.state'
88		self.__load_state()
89
90		if not cont:
91			if os.path.exists(self.tmpdir):
92				cmd = ('umount -f %s > /dev/null 2> /dev/null'
93					 	% (self.tmpdir))
94				autotest_utils.system(cmd, ignore_status=True)
95				autotest_utils.system('rm -rf ' + self.tmpdir)
96			os.mkdir(self.tmpdir)
97
98			results = os.path.join(self.autodir, 'results')
99			if not os.path.exists(results):
100				os.mkdir(results)
101
102			download = os.path.join(self.testdir, 'download')
103			if os.path.exists(download):
104				autotest_utils.system('rm -rf ' + download)
105			os.mkdir(download)
106
107			if os.path.exists(self.resultdir):
108				autotest_utils.system('rm -rf '
109							+ self.resultdir)
110			os.mkdir(self.resultdir)
111			os.mkdir(self.sysinfodir)
112
113			os.mkdir(os.path.join(self.resultdir, 'debug'))
114			os.mkdir(os.path.join(self.resultdir, 'analysis'))
115
116			shutil.copyfile(self.control,
117					os.path.join(self.resultdir, 'control'))
118
119
120		self.control = control
121		self.jobtag = jobtag
122		self.log_filename = self.DEFAULT_LOG_FILENAME
123		self.container = None
124
125		self.stdout = fd_stack.fd_stack(1, sys.stdout)
126		self.stderr = fd_stack.fd_stack(2, sys.stderr)
127
128		self._init_group_level()
129
130		self.config = config.config(self)
131
132		self.harness = harness.select(harness_type, self)
133
134		self.profilers = profilers.profilers(self)
135
136		try:
137			tool = self.config_get('boottool.executable')
138			self.bootloader = boottool.boottool(tool)
139		except:
140			pass
141
142		sysinfo.log_per_reboot_data(self.sysinfodir)
143
144		if not cont:
145			self.record('START', None, None)
146			self._increment_group_level()
147
148		self.harness.run_start()
149
150		if use_external_logging:
151			self.enable_external_logging()
152
153		# load the max disk usage rate - default to no monitoring
154		self.max_disk_usage_rate = self.get_state('__monitor_disk',
155							  default=0.0)
156
157
158	def monitor_disk_usage(self, max_rate):
159		"""\
160		Signal that the job should monitor disk space usage on /
161		and generate a warning if a test uses up disk space at a
162		rate exceeding 'max_rate'.
163
164		Parameters:
165		     max_rate - the maximium allowed rate of disk consumption
166		                during a test, in MB/hour, or 0 to indicate
167				no limit.
168		"""
169		self.set_state('__monitor_disk', max_rate)
170		self.max_disk_usage_rate = max_rate
171
172
173	def relative_path(self, path):
174		"""\
175		Return a patch relative to the job results directory
176		"""
177		head = len(self.resultdir) + 1     # remove the / inbetween
178		return path[head:]
179
180
181	def control_get(self):
182		return self.control
183
184
185	def control_set(self, control):
186		self.control = os.path.abspath(control)
187
188
189	def harness_select(self, which):
190		self.harness = harness.select(which, self)
191
192
193	def config_set(self, name, value):
194		self.config.set(name, value)
195
196
197	def config_get(self, name):
198		return self.config.get(name)
199
200	def setup_dirs(self, results_dir, tmp_dir):
201		if not tmp_dir:
202			tmp_dir = os.path.join(self.tmpdir, 'build')
203		if not os.path.exists(tmp_dir):
204			os.mkdir(tmp_dir)
205		if not os.path.isdir(tmp_dir):
206			e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir
207			raise ValueError(e_msg)
208
209		# We label the first build "build" and then subsequent ones
210		# as "build.2", "build.3", etc. Whilst this is a little bit
211		# inconsistent, 99.9% of jobs will only have one build
212		# (that's not done as kernbench, sparse, or buildtest),
213		# so it works out much cleaner. One of life's comprimises.
214		if not results_dir:
215			results_dir = os.path.join(self.resultdir, 'build')
216			i = 2
217			while os.path.exists(results_dir):
218				results_dir = os.path.join(self.resultdir, 'build.%d' % i)
219				i += 1
220		if not os.path.exists(results_dir):
221			os.mkdir(results_dir)
222
223		return (results_dir, tmp_dir)
224
225
226	def xen(self, base_tree, results_dir = '', tmp_dir = '', leave = False, \
227				kjob = None ):
228		"""Summon a xen object"""
229		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
230		build_dir = 'xen'
231		return xen.xen(self, base_tree, results_dir, tmp_dir, build_dir, leave, kjob)
232
233
234	def kernel(self, base_tree, results_dir = '', tmp_dir = '', leave = False):
235		"""Summon a kernel object"""
236		(results_dir, tmp_dir) = self.setup_dirs(results_dir, tmp_dir)
237		build_dir = 'linux'
238		return kernel.auto_kernel(self, base_tree, results_dir,
239					  tmp_dir, build_dir, leave)
240
241
242	def barrier(self, *args, **kwds):
243		"""Create a barrier object"""
244		return barrier.barrier(*args, **kwds)
245
246
247	def setup_dep(self, deps):
248		"""Set up the dependencies for this test.
249
250		deps is a list of libraries required for this test.
251		"""
252		for dep in deps:
253			try:
254				os.chdir(os.path.join(self.autodir, 'deps', dep))
255				autotest_utils.system('./' + dep + '.py')
256			except:
257				err = "setting up dependency " + dep + "\n"
258				raise error.UnhandledError(err)
259
260
261	def __runtest(self, url, tag, args, dargs):
262		try:
263			l = lambda : test.runtest(self, url, tag, args, dargs)
264			pid = parallel.fork_start(self.resultdir, l)
265			parallel.fork_waitfor(self.resultdir, pid)
266		except error.AutotestError:
267			raise
268		except:
269			raise error.UnhandledError('running test ' + \
270				self.__class__.__name__ + "\n")
271
272
273	def run_test(self, url, *args, **dargs):
274		"""Summon a test object and run it.
275
276		tag
277			tag to add to testname
278		url
279			url of the test to run
280		"""
281
282		if not url:
283			raise TypeError("Test name is invalid. "
284			                "Switched arguments?")
285		(group, testname) = test.testname(url)
286		tag = dargs.pop('tag', None)
287		container = dargs.pop('container', None)
288		subdir = testname
289		if tag:
290			subdir += '.' + tag
291
292		if container:
293			cname = container.get('name', None)
294			if not cname:   # get old name
295				cname = container.get('container_name', None)
296			mbytes = container.get('mbytes', None)
297			if not mbytes:  # get old name
298				mbytes = container.get('mem', None)
299			cpus  = container.get('cpus', None)
300			if not cpus:    # get old name
301				cpus  = container.get('cpu', None)
302			root  = container.get('root', None)
303			self.new_container(mbytes=mbytes, cpus=cpus,
304					root=root, name=cname)
305			# We are running in a container now...
306
307		def log_warning(reason):
308			self.record("WARN", subdir, testname, reason)
309		@disk_usage_monitor.watch(log_warning, "/",
310					  self.max_disk_usage_rate)
311		def group_func():
312			try:
313				self.__runtest(url, tag, args, dargs)
314			except error.TestNAError, detail:
315				self.record('TEST_NA', subdir, testname,
316					    str(detail))
317				raise
318			except Exception, detail:
319				self.record('FAIL', subdir, testname,
320					    str(detail))
321				raise
322			else:
323				self.record('GOOD', subdir, testname,
324					    'completed successfully')
325
326		result, exc_info = self.__rungroup(subdir, group_func)
327		if container:
328			self.release_container()
329		if exc_info and isinstance(exc_info[1], error.TestError):
330			return False
331		elif exc_info:
332			raise exc_info[0], exc_info[1], exc_info[2]
333		else:
334			return True
335
336
337	def __rungroup(self, name, function, *args, **dargs):
338		"""\
339		name:
340		        name of the group
341		function:
342			subroutine to run
343		*args:
344			arguments for the function
345
346		Returns a 2-tuple (result, exc_info) where result
347		is the return value of function, and exc_info is
348		the sys.exc_info() of the exception thrown by the
349		function (which may be None).
350		"""
351
352		result, exc_info = None, None
353		try:
354			self.record('START', None, name)
355			self._increment_group_level()
356			result = function(*args, **dargs)
357			self._decrement_group_level()
358			self.record('END GOOD', None, name)
359		except error.TestNAError, e:
360			self._decrement_group_level()
361			self.record('END TEST_NA', None, name, str(e))
362		except Exception, e:
363			exc_info = sys.exc_info()
364			self._decrement_group_level()
365			err_msg = str(e) + '\n' + traceback.format_exc()
366			self.record('END FAIL', None, name, err_msg)
367
368		return result, exc_info
369
370
371	def run_group(self, function, *args, **dargs):
372		"""\
373		function:
374			subroutine to run
375		*args:
376			arguments for the function
377		"""
378
379		# Allow the tag for the group to be specified
380		name = function.__name__
381		tag = dargs.pop('tag', None)
382		if tag:
383			name = tag
384
385		result, exc_info = self.__rungroup(name, function,
386						   *args, **dargs)
387
388		# if there was a non-TestError exception, raise it
389		if exc_info and not isinstance(exc_info[1], error.TestError):
390			err = ''.join(traceback.format_exception(*exc_info))
391			raise error.TestError(name + ' failed\n' + err)
392
393		# pass back the actual return value from the function
394		return result
395
396
397	def new_container(self, mbytes=None, cpus=None, root=None, name=None):
398		if not autotest_utils.grep('cpuset', '/proc/filesystems'):
399			print "Containers not enabled by latest reboot"
400			return  # containers weren't enabled in this kernel boot
401		pid = os.getpid()
402		if not name:
403			name = 'test%d' % pid  # make arbitrary unique name
404		self.container = cpuset.cpuset(name, job_size=mbytes,
405			job_pid=pid, cpus=cpus, root=root)
406		# This job's python shell is now running in the new container
407		# and all forked test processes will inherit that container
408
409
410	def release_container(self):
411		if self.container:
412			self.container.release()
413			self.container = None
414
415
416	def cpu_count(self):
417		if self.container:
418			return len(self.container.cpus)
419		return autotest_utils.count_cpus()  # use total system count
420
421
422	# Check the passed kernel identifier against the command line
423	# and the running kernel, abort the job on missmatch.
424	def kernel_check_ident(self, expected_when, expected_id, subdir,
425			       type = 'src', patches=[]):
426		print (("POST BOOT: checking booted kernel " +
427			"mark=%d identity='%s' type='%s'") %
428		       (expected_when, expected_id, type))
429
430		running_id = autotest_utils.running_os_ident()
431
432		cmdline = autotest_utils.read_one_line("/proc/cmdline")
433
434		find_sum = re.compile(r'.*IDENT=(\d+)')
435		m = find_sum.match(cmdline)
436		cmdline_when = -1
437		if m:
438			cmdline_when = int(m.groups()[0])
439
440		# We have all the facts, see if they indicate we
441		# booted the requested kernel or not.
442		bad = False
443		if (type == 'src' and expected_id != running_id or
444		    type == 'rpm' and
445		    not running_id.startswith(expected_id + '::')):
446			print "check_kernel_ident: kernel identifier mismatch"
447			bad = True
448		if expected_when != cmdline_when:
449			print "check_kernel_ident: kernel command line mismatch"
450			bad = True
451
452		if bad:
453			print "   Expected Ident: " + expected_id
454			print "    Running Ident: " + running_id
455			print "    Expected Mark: %d" % (expected_when)
456			print "Command Line Mark: %d" % (cmdline_when)
457			print "     Command Line: " + cmdline
458
459			raise error.JobError("boot failure", "reboot.verify")
460
461		kernel_info = {'kernel': expected_id}
462		for i, patch in enumerate(patches):
463			kernel_info["patch%d" % i] = patch
464		self.record('GOOD', subdir, 'reboot.verify', expected_id)
465		self._decrement_group_level()
466		self.record('END GOOD', subdir, 'reboot',
467			    optional_fields=kernel_info)
468
469
470	def filesystem(self, device, mountpoint = None, loop_size = 0):
471		if not mountpoint:
472			mountpoint = self.tmpdir
473		return filesystem.filesystem(self, device, mountpoint,loop_size)
474
475
476	def enable_external_logging(self):
477		pass
478
479
480	def disable_external_logging(self):
481		pass
482
483
484	def reboot_setup(self):
485		pass
486
487
488	def reboot(self, tag='autotest'):
489		self.reboot_setup()
490		self.record('START', None, 'reboot')
491		self._increment_group_level()
492		self.record('GOOD', None, 'reboot.start')
493		self.harness.run_reboot()
494		default = self.config_get('boot.set_default')
495		if default:
496			self.bootloader.set_default(tag)
497		else:
498			self.bootloader.boot_once(tag)
499		cmd = "(sleep 5; reboot) </dev/null >/dev/null 2>&1 &"
500		autotest_utils.system(cmd)
501		self.quit()
502
503
504	def noop(self, text):
505		print "job: noop: " + text
506
507
508	def parallel(self, *tasklist):
509		"""Run tasks in parallel"""
510
511		pids = []
512		old_log_filename = self.log_filename
513		for i, task in enumerate(tasklist):
514			self.log_filename = old_log_filename + (".%d" % i)
515			task_func = lambda: task[0](*task[1:])
516			pids.append(parallel.fork_start(self.resultdir,
517							task_func))
518
519		old_log_path = os.path.join(self.resultdir, old_log_filename)
520		old_log = open(old_log_path, "a")
521		exceptions = []
522		for i, pid in enumerate(pids):
523			# wait for the task to finish
524			try:
525				parallel.fork_waitfor(self.resultdir, pid)
526			except Exception, e:
527				exceptions.append(e)
528			# copy the logs from the subtask into the main log
529			new_log_path = old_log_path + (".%d" % i)
530			if os.path.exists(new_log_path):
531				new_log = open(new_log_path)
532				old_log.write(new_log.read())
533				new_log.close()
534				old_log.flush()
535				os.remove(new_log_path)
536		old_log.close()
537
538		self.log_filename = old_log_filename
539
540		# handle any exceptions raised by the parallel tasks
541		if exceptions:
542			msg = "%d task(s) failed" % len(exceptions)
543			raise error.JobError(msg, str(exceptions), exceptions)
544
545
546	def quit(self):
547		# XXX: should have a better name.
548		self.harness.run_pause()
549		raise error.JobContinue("more to come")
550
551
552	def complete(self, status):
553		"""Clean up and exit"""
554		# We are about to exit 'complete' so clean up the control file.
555		try:
556			os.unlink(self.state_file)
557		except:
558			pass
559
560		self.harness.run_complete()
561		self.disable_external_logging()
562		sys.exit(status)
563
564
565	def set_state(self, var, val):
566		# Deep copies make sure that the state can't be altered
567		# without it being re-written.  Perf wise, deep copies
568		# are overshadowed by pickling/loading.
569		self.state[var] = copy.deepcopy(val)
570		pickle.dump(self.state, open(self.state_file, 'w'))
571
572
573	def __load_state(self):
574		assert not hasattr(self, "state")
575		try:
576			self.state = pickle.load(open(self.state_file, 'r'))
577			self.state_existed = True
578		except Exception:
579			print "Initializing the state engine."
580			self.state = {}
581			self.set_state('__steps', []) # writes pickle file
582			self.state_existed = False
583
584
585	def get_state(self, var, default=None):
586		if var in self.state or default == None:
587			val = self.state[var]
588		else:
589			val = default
590		return copy.deepcopy(val)
591
592
593	def __create_step_tuple(self, fn, args, dargs):
594		# Legacy code passes in an array where the first arg is
595		# the function or its name.
596		if isinstance(fn, list):
597			assert(len(args) == 0)
598			assert(len(dargs) == 0)
599			args = fn[1:]
600			fn = fn[0]
601		# Pickling actual functions is harry, thus we have to call
602		# them by name.  Unfortunately, this means only functions
603		# defined globally can be used as a next step.
604		if isinstance(fn, types.FunctionType):
605			fn = fn.__name__
606		if not isinstance(fn, types.StringTypes):
607			raise StepError("Next steps must be functions or "
608			                "strings containing the function name")
609		return (fn, args, dargs)
610
611
612	def next_step(self, fn, *args, **dargs):
613		"""Define the next step"""
614		steps = self.get_state('__steps')
615		steps.append(self.__create_step_tuple(fn, args, dargs))
616		self.set_state('__steps', steps)
617
618
619	def next_step_prepend(self, fn, *args, **dargs):
620		"""Insert a new step, executing first"""
621		steps = self.get_state('__steps')
622		steps.insert(0, self.__create_step_tuple(fn, args, dargs))
623		self.set_state('__steps', steps)
624
625
626	def step_engine(self):
627		"""the stepping engine -- if the control file defines
628		step_init we will be using this engine to drive multiple runs.
629		"""
630		"""Do the next step"""
631
632		# Set up the environment and then interpret the control file.
633		# Some control files will have code outside of functions,
634		# which means we need to have our state engine initialized
635		# before reading in the file.
636		lcl = {'job': self}
637		exec(JOB_PREAMBLE, lcl, lcl)
638		execfile(self.control, lcl, lcl)
639
640		# If we loaded in a mid-job state file, then we presumably
641		# know what steps we have yet to run.
642		if not self.state_existed:
643			if lcl.has_key('step_init'):
644				self.next_step([lcl['step_init']])
645
646		# Iterate through the steps.  If we reboot, we'll simply
647		# continue iterating on the next step.
648		while len(self.get_state('__steps')) > 0:
649			steps = self.get_state('__steps')
650			(fn, args, dargs) = steps.pop(0)
651			self.set_state('__steps', steps)
652
653			lcl['__args'] = args
654			lcl['__dargs'] = dargs
655			exec(fn + "(*__args, **__dargs)", lcl, lcl)
656
657
658	def _init_group_level(self):
659		self.group_level = self.get_state("__group_level", default=0)
660
661
662	def _increment_group_level(self):
663		self.group_level += 1
664		self.set_state("__group_level", self.group_level)
665
666
667	def _decrement_group_level(self):
668		self.group_level -= 1
669		self.set_state("__group_level", self.group_level)
670
671
672	def record(self, status_code, subdir, operation, status = '',
673		   optional_fields=None):
674		"""
675		Record job-level status
676
677		The intent is to make this file both machine parseable and
678		human readable. That involves a little more complexity, but
679		really isn't all that bad ;-)
680
681		Format is <status code>\t<subdir>\t<operation>\t<status>
682
683		status code: (GOOD|WARN|FAIL|ABORT)
684			or   START
685			or   END (GOOD|WARN|FAIL|ABORT)
686
687		subdir: MUST be a relevant subdirectory in the results,
688		or None, which will be represented as '----'
689
690		operation: description of what you ran (e.g. "dbench", or
691						"mkfs -t foobar /dev/sda9")
692
693		status: error message or "completed sucessfully"
694
695		------------------------------------------------------------
696
697		Initial tabs indicate indent levels for grouping, and is
698		governed by self.group_level
699
700		multiline messages have secondary lines prefaced by a double
701		space ('  ')
702		"""
703
704		if subdir:
705			if re.match(r'[\n\t]', subdir):
706				raise ValueError("Invalid character in "
707						 "subdir string")
708			substr = subdir
709		else:
710			substr = '----'
711
712		if not logging.is_valid_status(status_code):
713			raise ValueError("Invalid status code supplied: %s" %
714					 status_code)
715		if not operation:
716			operation = '----'
717
718		if re.match(r'[\n\t]', operation):
719			raise ValueError("Invalid character in "
720					 "operation string")
721		operation = operation.rstrip()
722
723		if not optional_fields:
724			optional_fields = {}
725
726		status = status.rstrip()
727		status = re.sub(r"\t", "  ", status)
728		# Ensure any continuation lines are marked so we can
729		# detect them in the status file to ensure it is parsable.
730		status = re.sub(r"\n", "\n" + "\t" * self.group_level + "  ",
731				status)
732
733		# Generate timestamps for inclusion in the logs
734		epoch_time = int(time.time())  # seconds since epoch, in UTC
735		local_time = time.localtime(epoch_time)
736		optional_fields["timestamp"] = str(epoch_time)
737		optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",
738							     local_time)
739
740		fields = [status_code, substr, operation]
741		fields += ["%s=%s" % x for x in optional_fields.iteritems()]
742		fields.append(status)
743
744		msg = '\t'.join(str(x) for x in fields)
745		msg = '\t' * self.group_level + msg
746
747		msg_tag = ""
748		if "." in self.log_filename:
749			msg_tag = self.log_filename.split(".", 1)[1]
750
751		self.harness.test_status_detail(status_code, substr,
752						operation, status, msg_tag)
753		self.harness.test_status(msg, msg_tag)
754
755		# log to stdout (if enabled)
756		#if self.log_filename == self.DEFAULT_LOG_FILENAME:
757		print msg
758
759		# log to the "root" status log
760		status_file = os.path.join(self.resultdir, self.log_filename)
761		open(status_file, "a").write(msg + "\n")
762
763		# log to the subdir status log (if subdir is set)
764		if subdir:
765			dir = os.path.join(self.resultdir, subdir)
766			if not os.path.exists(dir):
767				os.mkdir(dir)
768
769			status_file = os.path.join(dir,
770						   self.DEFAULT_LOG_FILENAME)
771			open(status_file, "a").write(msg + "\n")
772
773
774class disk_usage_monitor:
775	def __init__(self, logging_func, device, max_mb_per_hour):
776		self.func = logging_func
777		self.device = device
778		self.max_mb_per_hour = max_mb_per_hour
779
780
781	def start(self):
782		self.initial_space = autotest_utils.freespace(self.device)
783		self.start_time = time.time()
784
785
786	def stop(self):
787		# if no maximum usage rate was set, we don't need to
788		# generate any warnings
789		if not self.max_mb_per_hour:
790			return
791
792		final_space = autotest_utils.freespace(self.device)
793		used_space = self.initial_space - final_space
794		stop_time = time.time()
795		total_time = stop_time - self.start_time
796		# round up the time to one minute, to keep extremely short
797		# tests from generating false positives due to short, badly
798		# timed bursts of activity
799		total_time = max(total_time, 60.0)
800
801		# determine the usage rate
802		bytes_per_sec = used_space / total_time
803		mb_per_sec = bytes_per_sec / 1024**2
804		mb_per_hour = mb_per_sec * 60 * 60
805
806		if mb_per_hour > self.max_mb_per_hour:
807			msg = ("disk space on %s was consumed at a rate of "
808			       "%.2f MB/hour")
809			msg %= (self.device, mb_per_hour)
810			self.func(msg)
811
812
813	@classmethod
814	def watch(cls, *monitor_args, **monitor_dargs):
815		""" Generic decorator to wrap a function call with the
816		standard create-monitor -> start -> call -> stop idiom."""
817		def decorator(func):
818			def watched_func(*args, **dargs):
819				monitor = cls(*monitor_args, **monitor_dargs)
820				monitor.start()
821				try:
822					func(*args, **dargs)
823				finally:
824					monitor.stop()
825			return watched_func
826		return decorator
827
828
829def runjob(control, cont = False, tag = "default", harness_type = '',
830	   use_external_logging = False):
831	"""The main interface to this module
832
833	control
834		The control file to use for this job.
835	cont
836		Whether this is the continuation of a previously started job
837	"""
838	control = os.path.abspath(control)
839	state = control + '.state'
840
841	# instantiate the job object ready for the control file.
842	myjob = None
843	try:
844		# Check that the control file is valid
845		if not os.path.exists(control):
846			raise error.JobError(control +
847						": control file not found")
848
849		# When continuing, the job is complete when there is no
850		# state file, ensure we don't try and continue.
851		if cont and not os.path.exists(state):
852			raise error.JobComplete("all done")
853		if cont == False and os.path.exists(state):
854			os.unlink(state)
855
856		myjob = job(control, tag, cont, harness_type,
857			    use_external_logging)
858
859		# Load in the users control file, may do any one of:
860		#  1) execute in toto
861		#  2) define steps, and select the first via next_step()
862		myjob.step_engine()
863
864	except error.JobContinue:
865		sys.exit(5)
866
867	except error.JobComplete:
868		sys.exit(1)
869
870	except error.JobError, instance:
871		print "JOB ERROR: " + instance.args[0]
872		if myjob:
873			command = None
874			if len(instance.args) > 1:
875				command = instance.args[1]
876			myjob.record('ABORT', None, command, instance.args[0])
877			myjob._decrement_group_level()
878			myjob.record('END ABORT', None, None)
879			assert(myjob.group_level == 0)
880			myjob.complete(1)
881		else:
882			sys.exit(1)
883
884	except Exception, e:
885		msg = str(e) + '\n' + traceback.format_exc()
886		print "JOB ERROR: " + msg
887		if myjob:
888			myjob.record('ABORT', None, None, msg)
889			myjob._decrement_group_level()
890			myjob.record('END ABORT', None, None)
891			assert(myjob.group_level == 0)
892			myjob.complete(1)
893		else:
894			sys.exit(1)
895
896	# If we get here, then we assume the job is complete and good.
897	myjob._decrement_group_level()
898	myjob.record('END GOOD', None, None)
899	assert(myjob.group_level == 0)
900
901	myjob.complete(0)
902
903
904# site_job.py may be non-existant or empty, make sure that an appropriate
905# site_job class is created nevertheless
906try:
907	from site_job import site_job
908except ImportError:
909	class site_job(base_job):
910		pass
911
912class job(site_job):
913	pass
914
915