server_job.py revision b03ba64bfac2f4268198c84974663923ca456621
1"""
2The main job wrapper for the server side.
3
4This is the core infrastructure. Derived from the client side job.py
5
6Copyright Martin J. Bligh, Andy Whitcroft 2007
7"""
8
9__author__ = """
10Martin J. Bligh <mbligh@google.com>
11Andy Whitcroft <apw@shadowen.org>
12"""
13
14import os, sys, re, time, select, subprocess, traceback
15import test
16from utils import *
17from common.error import *
18
19# this magic incantation should give us access to a client library
20server_dir = os.path.dirname(__file__)
21client_dir = os.path.join(server_dir, "..", "client", "bin")
22sys.path.append(client_dir)
23import fd_stack
24sys.path.pop()
25
26# load up a control segment
27# these are all stored in <server_dir>/control_segments
28def load_control_segment(name):
29	server_dir = os.path.dirname(os.path.abspath(__file__))
30	script_file = os.path.join(server_dir, "control_segments", name)
31	if os.path.exists(script_file):
32		return file(script_file).read()
33	else:
34		return ""
35
36
37preamble = """\
38import os, sys
39
40import hosts, autotest, kvm, git, standalone_profiler
41import source_kernel, rpm_kernel, deb_kernel, git_kernel
42from common.error import *
43from common import barrier
44from subcommand import *
45from utils import run, get_tmp_dir, sh_escape
46
47autotest.Autotest.job = job
48hosts.SSHHost.job = job
49barrier = barrier.barrier
50
51if len(machines) > 1:
52	open('.machines', 'w').write('\\n'.join(machines) + '\\n')
53"""
54
55client_wrapper = """
56at = autotest.Autotest()
57
58def run_client(machine):
59	host = hosts.SSHHost(machine)
60	at.run(control, host=host)
61
62parallel_simple(run_client, machines)
63"""
64
65crashdumps = """
66def crashdumps(machine):
67	host = hosts.SSHHost(machine, initialize=False)
68	host.get_crashdumps(test_start_time)
69
70parallel_simple(crashdumps, machines, log=False)
71"""
72
73reboot_segment="""\
74def reboot(machine):
75	host = hosts.SSHHost(machine, initialize=False)
76	host.reboot()
77
78parallel_simple(reboot, machines, log=False)
79"""
80
81install="""\
82def install(machine):
83	host = hosts.SSHHost(machine, initialize=False)
84	host.machine_install()
85
86parallel_simple(install, machines, log=False)
87"""
88
89# load up the verifier control segment, with an optional site-specific hook
90verify = load_control_segment("site_verify")
91verify += load_control_segment("verify")
92
93# load up the repair control segment, with an optional site-specific hook
94repair = load_control_segment("site_repair")
95repair += load_control_segment("repair")
96
97
98# load up site-specific code for generating site-specific job data
99try:
100	import site_job
101	get_site_job_data = site_job.get_site_job_data
102	del site_job
103except ImportError:
104	# by default provide a stub that generates no site data
105	def get_site_job_data(job):
106		return {}
107
108
109class server_job:
110	"""The actual job against which we do everything.
111
112	Properties:
113		autodir
114			The top level autotest directory (/usr/local/autotest).
115		serverdir
116			<autodir>/server/
117		clientdir
118			<autodir>/client/
119		conmuxdir
120			<autodir>/conmux/
121		testdir
122			<autodir>/server/tests/
123		control
124			the control file for this job
125	"""
126
127	def __init__(self, control, args, resultdir, label, user, machines,
128								client = False):
129		"""
130			control
131				The control file (pathname of)
132			args
133				args to pass to the control file
134			resultdir
135				where to throw the results
136			label
137				label for the job
138			user
139				Username for the job (email address)
140			client
141				True if a client-side control file
142		"""
143		path = os.path.dirname(sys.modules['server_job'].__file__)
144		self.autodir = os.path.abspath(os.path.join(path, '..'))
145		self.serverdir = os.path.join(self.autodir, 'server')
146		self.testdir   = os.path.join(self.serverdir, 'tests')
147		self.tmpdir    = os.path.join(self.serverdir, 'tmp')
148		self.conmuxdir = os.path.join(self.autodir, 'conmux')
149		self.clientdir = os.path.join(self.autodir, 'client')
150		if control:
151			self.control = open(control, 'r').read()
152			self.control = re.sub('\r', '', self.control)
153		else:
154			self.control = None
155		self.resultdir = resultdir
156		if not os.path.exists(resultdir):
157			os.mkdir(resultdir)
158		self.debugdir = os.path.join(resultdir, 'debug')
159		if not os.path.exists(self.debugdir):
160			os.mkdir(self.debugdir)
161		self.status = os.path.join(resultdir, 'status')
162		self.label = label
163		self.user = user
164		self.args = args
165		self.machines = machines
166		self.client = client
167		self.record_prefix = ''
168		self.warning_loggers = set()
169
170		self.stdout = fd_stack.fd_stack(1, sys.stdout)
171		self.stderr = fd_stack.fd_stack(2, sys.stderr)
172
173		if os.path.exists(self.status):
174			os.unlink(self.status)
175		job_data = { 'label' : label, 'user' : user,
176					'hostname' : ','.join(machines) }
177		job_data.update(get_site_job_data(self))
178		write_keyval(self.resultdir, job_data)
179
180
181	def verify(self):
182		if not self.machines:
183			raise AutoservError('No machines specified to verify')
184		try:
185			namespace = {'machines' : self.machines, 'job' : self}
186			exec(preamble + verify, namespace, namespace)
187		except Exception, e:
188			msg = 'Verify failed\n' + str(e) + '\n' + format_error()
189			self.record('ABORT', None, None, msg)
190			raise
191
192
193	def repair(self):
194		if not self.machines:
195			raise AutoservError('No machines specified to repair')
196		namespace = {'machines' : self.machines, 'job' : self}
197		# no matter what happens during repair, go on to try to reverify
198		try:
199			exec(preamble + repair, namespace, namespace)
200		except Exception, exc:
201			print 'Exception occured during repair'
202			traceback.print_exc()
203		self.verify()
204
205
206	def run(self, reboot = False, install_before = False,
207		install_after = False, collect_crashdumps = True,
208		namespace = {}):
209		# use a copy so changes don't affect the original dictionary
210		namespace = namespace.copy()
211		machines = self.machines
212
213		self.aborted = False
214		namespace['machines'] = machines
215		namespace['args'] = self.args
216		namespace['job'] = self
217		test_start_time = int(time.time())
218
219		os.chdir(self.resultdir)
220
221		status_log = os.path.join(self.resultdir, 'status.log')
222		try:
223			if install_before and machines:
224				exec(preamble + install, namespace, namespace)
225			if self.client:
226				namespace['control'] = self.control
227				open('control', 'w').write(self.control)
228				open('control.srv', 'w').write(client_wrapper)
229				server_control = client_wrapper
230			else:
231				open('control.srv', 'w').write(self.control)
232				server_control = self.control
233			exec(preamble + server_control, namespace, namespace)
234
235		finally:
236			if machines and collect_crashdumps:
237				namespace['test_start_time'] = test_start_time
238				exec(preamble + crashdumps,
239				     namespace, namespace)
240			if reboot and machines:
241				exec(preamble + reboot_segment,
242				     namespace, namespace)
243			if install_after and machines:
244				exec(preamble + install, namespace, namespace)
245
246
247	def run_test(self, url, *args, **dargs):
248		"""Summon a test object and run it.
249
250		tag
251			tag to add to testname
252		url
253			url of the test to run
254		"""
255
256		(group, testname) = test.testname(url)
257		tag = None
258		subdir = testname
259
260		if dargs.has_key('tag'):
261			tag = dargs['tag']
262			del dargs['tag']
263			if tag:
264				subdir += '.' + tag
265
266		try:
267			test.runtest(self, url, tag, args, dargs)
268			self.record('GOOD', subdir, testname, 'completed successfully')
269		except Exception, detail:
270			self.record('FAIL', subdir, testname, format_error())
271
272
273	def run_group(self, function, *args, **dargs):
274		"""\
275		function:
276			subroutine to run
277		*args:
278			arguments for the function
279		"""
280
281		result = None
282		name = function.__name__
283
284		# Allow the tag for the group to be specified.
285		if dargs.has_key('tag'):
286			tag = dargs['tag']
287			del dargs['tag']
288			if tag:
289				name = tag
290
291		# if tag:
292		#	name += '.' + tag
293		old_record_prefix = self.record_prefix
294		try:
295			try:
296				self.record('START', None, name)
297				self.record_prefix += '\t'
298				result = function(*args, **dargs)
299				self.record_prefix = old_record_prefix
300				self.record('END GOOD', None, name)
301			except:
302				self.record_prefix = old_record_prefix
303				self.record('END FAIL', None, name, format_error())
304		# We don't want to raise up an error higher if it's just
305		# a TestError - we want to carry on to other tests. Hence
306		# this outer try/except block.
307		except TestError:
308			pass
309		except:
310			raise TestError(name + ' failed\n' + format_error())
311
312		return result
313
314
315	def record(self, status_code, subdir, operation, status=''):
316		"""
317		Record job-level status
318
319		The intent is to make this file both machine parseable and
320		human readable. That involves a little more complexity, but
321		really isn't all that bad ;-)
322
323		Format is <status code>\t<subdir>\t<operation>\t<status>
324
325		status code: (GOOD|WARN|FAIL|ABORT)
326			or   START
327			or   END (GOOD|WARN|FAIL|ABORT)
328
329		subdir: MUST be a relevant subdirectory in the results,
330		or None, which will be represented as '----'
331
332		operation: description of what you ran (e.g. "dbench", or
333						"mkfs -t foobar /dev/sda9")
334
335		status: error message or "completed sucessfully"
336
337		------------------------------------------------------------
338
339		Initial tabs indicate indent levels for grouping, and is
340		governed by self.record_prefix
341
342		multiline messages have secondary lines prefaced by a double
343		space ('  ')
344
345		Executing this method will trigger the logging of all new
346		warnings to date from the various console loggers.
347		"""
348		# poll all our warning loggers for new warnings
349		warnings = self._read_warnings()
350		for timestamp, msg in warnings:
351			self.__record("WARN", None, None, msg, timestamp)
352
353		# write out the actual status log line
354		self.__record(status_code, subdir, operation, status)
355
356
357	def _read_warnings(self):
358		warnings = []
359		while True:
360			# pull in a line of output from every logger that has
361			# output ready to be read
362			loggers, _, _ = select.select(self.warning_loggers,
363						      [], [], 0)
364			closed_loggers = set()
365			for logger in loggers:
366				line = logger.readline()
367				# record any broken pipes (aka line == empty)
368				if len(line) == 0:
369					closed_loggers.add(logger)
370					continue
371				timestamp, msg = line.split('\t', 1)
372				warnings.append((int(timestamp), msg.strip()))
373
374			# stop listening to loggers that are closed
375			self.warning_loggers -= closed_loggers
376
377			# stop if none of the loggers have any output left
378			if not loggers:
379				break
380
381		# sort into timestamp order
382		warnings.sort()
383		return warnings
384
385
386	def _render_record(self, status_code, subdir, operation, status='',
387			   epoch_time=None, record_prefix=None):
388		"""
389		Internal Function to generate a record to be written into a
390		status log. For use by server_job.* classes only.
391		"""
392		if subdir:
393			if re.match(r'[\n\t]', subdir):
394				raise ValueError('Invalid character in subdir string')
395			substr = subdir
396		else:
397			substr = '----'
398
399		if not re.match(r'(START|(END )?(GOOD|WARN|FAIL|ABORT))$', \
400								status_code):
401			raise ValueError('Invalid status code supplied: %s' % status_code)
402		if not operation:
403			operation = '----'
404		if re.match(r'[\n\t]', operation):
405			raise ValueError('Invalid character in operation string')
406		operation = operation.rstrip()
407		status = status.rstrip()
408		status = re.sub(r"\t", "  ", status)
409		# Ensure any continuation lines are marked so we can
410		# detect them in the status file to ensure it is parsable.
411		status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)
412
413		# Generate timestamps for inclusion in the logs
414		if epoch_time is None:
415			epoch_time = int(time.time())
416		local_time = time.localtime(epoch_time)
417		epoch_time_str = "timestamp=%d" % (epoch_time,)
418		local_time_str = time.strftime("localtime=%b %d %H:%M:%S",
419					       local_time)
420
421		if record_prefix is None:
422			record_prefix = self.record_prefix
423
424		msg = '\t'.join(str(x) for x in (status_code, substr, operation,
425						 epoch_time_str, local_time_str,
426						 status))
427		return record_prefix + msg + '\n'
428
429
430	def _record_prerendered(self, msg):
431		"""
432		Record a pre-rendered msg into the status logs. The only
433		change this makes to the message is to add on the local
434		indentation. Should not be called outside of server_job.*
435		classes. Unlike __record, this does not write the message
436		to standard output.
437		"""
438		status_file = os.path.join(self.resultdir, 'status.log')
439		status_log = open(status_file, 'a')
440		need_reparse = False
441		for line in msg.splitlines():
442			line = self.record_prefix + line + '\n'
443			status_log.write(line)
444			if self.__need_reparse(line):
445				need_reparse = True
446		status_log.close()
447		if need_reparse:
448			self.__parse_status()
449
450
451	def __record(self, status_code, subdir, operation, status='',
452		     epoch_time=None):
453		"""
454		Actual function for recording a single line into the status
455		logs. Should never be called directly, only by job.record as
456		this would bypass the console monitor logging.
457		"""
458
459		msg = self._render_record(status_code, subdir, operation,
460					  status, epoch_time)
461
462
463		status_file = os.path.join(self.resultdir, 'status.log')
464		sys.stdout.write(msg)
465		open(status_file, "a").write(msg)
466		if subdir:
467			test_dir = os.path.join(self.resultdir, subdir)
468			if not os.path.exists(test_dir):
469				os.mkdir(test_dir)
470			status_file = os.path.join(test_dir, 'status')
471			open(status_file, "a").write(msg)
472		if self.__need_reparse(msg):
473			self.__parse_status()
474
475
476	def __need_reparse(self, line):
477		# the parser will not record results if lines have more than
478		# one level of indentation
479		indent = len(re.search(r"^(\t*)", line).group(1))
480		if indent > 1:
481			return False
482		# we can also skip START lines, as they add nothing
483		line = line.lstrip("\t")
484		if line.startswith("START\t"):
485			return False
486		# otherwise, we should do a parse
487		return True
488
489
490	def __parse_status(self):
491		"""
492		If a .parse.cmd file is present in the results directory,
493		launch the tko parser.
494		"""
495		cmdfile = os.path.join(self.resultdir, '.parse.cmd')
496		if os.path.exists(cmdfile):
497			cmd = open(cmdfile).read().strip()
498			subprocess.Popen(cmd, shell=True)
499
500
501# a file-like object for catching stderr from an autotest client and
502# extracting status logs from it
503class client_logger(object):
504	"""Partial file object to write to both stdout and
505	the status log file.  We only implement those methods
506	utils.run() actually calls.
507	"""
508	parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
509	extract_indent = re.compile(r"^(\t*).*$")
510
511	def __init__(self, job):
512		self.job = job
513		self.leftover = ""
514		self.last_line = ""
515		self.logs = {}
516
517
518	def _process_log_dict(self, log_dict):
519		log_list = log_dict.pop("logs", [])
520		for key in sorted(log_dict.iterkeys()):
521			log_list += self._process_log_dict(log_dict.pop(key))
522		return log_list
523
524
525	def _process_logs(self):
526		"""Go through the accumulated logs in self.log and print them
527		out to stdout and the status log. Note that this processes
528		logs in an ordering where:
529
530		1) logs to different tags are never interleaved
531		2) logs to x.y come before logs to x.y.z for all z
532		3) logs to x.y come before x.z whenever y < z
533
534		Note that this will in general not be the same as the
535		chronological ordering of the logs. However, if a chronological
536		ordering is desired that one can be reconstructed from the
537		status log by looking at timestamp lines."""
538		log_list = self._process_log_dict(self.logs)
539		for line in log_list:
540			self.job._record_prerendered(line + '\n')
541		if log_list:
542			self.last_line = log_list[-1]
543
544
545	def _process_quoted_line(self, tag, line):
546		"""Process a line quoted with an AUTOTEST_STATUS flag. If the
547		tag is blank then we want to push out all the data we've been
548		building up in self.logs, and then the newest line. If the
549		tag is not blank, then push the line into the logs for handling
550		later."""
551		print line
552		if tag == "":
553			self._process_logs()
554			self.job._record_prerendered(line + '\n')
555			self.last_line = line
556		else:
557			tag_parts = [int(x) for x in tag.split(".")]
558			log_dict = self.logs
559			for part in tag_parts:
560				log_dict = log_dict.setdefault(part, {})
561			log_list = log_dict.setdefault("logs", [])
562			log_list.append(line)
563
564
565	def _process_line(self, line):
566		"""Write out a line of data to the appropriate stream. Status
567		lines sent by autotest will be prepended with
568		"AUTOTEST_STATUS", and all other lines are ssh error
569		messages."""
570		match = self.parser.search(line)
571		if match:
572			tag, line = match.groups()
573			self._process_quoted_line(tag, line)
574		else:
575			print line
576
577
578	def _format_warnings(self, last_line, warnings):
579		# use the indentation of whatever the last log line was
580		indent = self.extract_indent.match(last_line).group(1)
581		# if the last line starts a new group, add an extra indent
582		if last_line.lstrip('\t').startswith("START\t"):
583			indent += '\t'
584		return [self.job._render_record("WARN", None, None, msg,
585						timestamp, indent).rstrip('\n')
586			for timestamp, msg in warnings]
587
588
589	def _process_warnings(self, last_line, log_dict, warnings):
590		if log_dict.keys() in ([], ["logs"]):
591			# there are no sub-jobs, just append the warnings here
592			warnings = self._format_warnings(last_line, warnings)
593			log_list = log_dict.setdefault("logs", [])
594			log_list += warnings
595			for warning in warnings:
596				sys.stdout.write(warning + '\n')
597		else:
598			# there are sub-jobs, so put the warnings in there
599			log_list = log_dict.get("logs", [])
600			if log_list:
601				last_line = log_list[-1]
602			for key in sorted(log_dict.iterkeys()):
603				if key != "logs":
604					self._process_warnings(last_line,
605							       log_dict[key],
606							       warnings)
607
608
609	def write(self, data):
610		# first check for any new console warnings
611		warnings = self.job._read_warnings()
612		self._process_warnings(self.last_line, self.logs, warnings)
613		# now process the newest data written out
614		data = self.leftover + data
615		lines = data.split("\n")
616		# process every line but the last one
617		for line in lines[:-1]:
618			self._process_line(line)
619		# save the last line for later processing
620		# since we may not have the whole line yet
621		self.leftover = lines[-1]
622
623
624	def flush(self):
625		sys.stdout.flush()
626
627
628	def close(self):
629		if self.leftover:
630			self._process_line(self.leftover)
631		self._process_logs()
632		self.flush()
633