utils.py revision d93d7d2a9728bbed6d125a6d8a9f61bb3f2b6718
1#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
6import sys, time, textwrap, urllib, urlparse
7import error, barrier
8
9
10def read_one_line(filename):
11	return open(filename, 'r').readline().strip()
12
13
14def write_one_line(filename, str):
15	open(filename, 'w').write(str.rstrip() + "\n")
16
17
18def read_keyval(path):
19	"""
20	Read a key-value pair format file into a dictionary, and return it.
21	Takes either a filename or directory name as input. If it's a
22	directory name, we assume you want the file to be called keyval.
23	"""
24	if os.path.isdir(path):
25		path = os.path.join(path, 'keyval')
26	keyval = {}
27	for line in open(path):
28		line = re.sub('#.*', '', line.rstrip())
29		if not re.search(r'^[-\w]+=', line):
30			raise ValueError('Invalid format line: %s' % line)
31		key, value = line.split('=', 1)
32		if re.search('^\d+$', value):
33			value = int(value)
34		elif re.search('^(\d+\.)?\d+$', value):
35			value = float(value)
36		keyval[key] = value
37	return keyval
38
39
40def write_keyval(path, dictionary, type_tag=None):
41	"""
42	Write a key-value pair format file out to a file. This uses append
43	mode to open the file, so existing text will not be overwritten or
44	reparsed.
45
46	If type_tag is None, then the key must be composed of alphanumeric
47	characters (or dashes+underscores). However, if type-tag is not
48	null then the keys must also have "{type_tag}" as a suffix. At
49	the moment the only valid values of type_tag are "attr" and "perf".
50	"""
51	if os.path.isdir(path):
52		path = os.path.join(path, 'keyval')
53	keyval = open(path, 'a')
54
55	if type_tag is None:
56		key_regex = re.compile(r'^[-\w]+$')
57	else:
58		if type_tag not in ('attr', 'perf'):
59			raise ValueError('Invalid type tag: %s' % type_tag)
60		escaped_tag = re.escape(type_tag)
61		key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
62	try:
63		for key, value in dictionary.iteritems():
64			if not key_regex.search(key):
65				raise ValueError('Invalid key: %s' % key)
66			keyval.write('%s=%s\n' % (key, value))
67	finally:
68		keyval.close()
69
70
71def is_url(path):
72	"""Return true if path looks like a URL"""
73	# for now, just handle http and ftp
74	url_parts = urlparse.urlparse(path)
75	return (url_parts[0] in ('http', 'ftp'))
76
77
78def get_file(src, dest, permissions=None):
79	"""Get a file from src, which can be local or a remote URL"""
80	if (src == dest):
81		return
82	if (is_url(src)):
83		print 'PWD: ' + os.getcwd()
84		print 'Fetching \n\t', src, '\n\t->', dest
85		try:
86			urllib.urlretrieve(src, dest)
87		except IOError, e:
88			raise error.AutotestError('Unable to retrieve %s (to %s)'
89					    % (src, dest), e)
90	else:
91		shutil.copyfile(src, dest)
92	if permissions:
93		os.chmod(dest, permissions)
94	return dest
95
96
97def unmap_url(srcdir, src, destdir='.'):
98	"""
99	Receives either a path to a local file or a URL.
100	returns either the path to the local file, or the fetched URL
101
102	unmap_url('/usr/src', 'foo.tar', '/tmp')
103				= '/usr/src/foo.tar'
104	unmap_url('/usr/src', 'http://site/file', '/tmp')
105				= '/tmp/file'
106				(after retrieving it)
107	"""
108	if is_url(src):
109		url_parts = urlparse.urlparse(src)
110		filename = os.path.basename(url_parts[2])
111		dest = os.path.join(destdir, filename)
112		return get_file(src, dest)
113	else:
114		return os.path.join(srcdir, src)
115
116
117def update_version(srcdir, preserve_srcdir, new_version, install,
118		   *args, **dargs):
119	"""
120	Make sure srcdir is version new_version
121
122	If not, delete it and install() the new version.
123
124	In the preserve_srcdir case, we just check it's up to date,
125	and if not, we rerun install, without removing srcdir
126	"""
127	versionfile = os.path.join(srcdir, '.version')
128	install_needed = True
129
130	if os.path.exists(versionfile):
131		old_version = pickle.load(open(versionfile))
132		if old_version == new_version:
133			install_needed = False
134
135	if install_needed:
136		if not preserve_srcdir and os.path.exists(srcdir):
137			shutil.rmtree(srcdir)
138		install(*args, **dargs)
139		if os.path.exists(srcdir):
140			pickle.dump(new_version, open(versionfile, 'w'))
141
142
143def run(command, timeout=None, ignore_status=False,
144	stdout_tee=None, stderr_tee=None):
145	"""
146	Run a command on the host.
147
148	Args:
149		command: the command line string
150		timeout: time limit in seconds before attempting to
151			kill the running process. The run() function
152			will take a few seconds longer than 'timeout'
153			to complete if it has to kill the process.
154		ignore_status: do not raise an exception, no matter what
155			the exit code of the command is.
156		stdout_tee: optional file-like object to which stdout data
157		            will be written as it is generated (data will still
158			    be stored in result.stdout)
159		stderr_tee: likewise for stderr
160
161	Returns:
162		a CmdResult object
163
164	Raises:
165		CmdError: the exit code of the command
166			execution was not 0
167	"""
168	return join_bg_job(run_bg(command), command, timeout, ignore_status,
169			   stdout_tee, stderr_tee)
170
171
172def run_bg(command):
173	"""Run the command in a subprocess and return the subprocess."""
174	result = CmdResult(command)
175	sp = subprocess.Popen(command, stdout=subprocess.PIPE,
176			      stderr=subprocess.PIPE,
177			      shell=True, executable="/bin/bash")
178	return sp, result
179
180
181def join_bg_job(bg_job, command, timeout=None, ignore_status=False,
182	stdout_tee=None, stderr_tee=None):
183	"""Join the subprocess with the current thread. See run description."""
184	sp, result = bg_job
185	stdout_file = StringIO.StringIO()
186	stderr_file = StringIO.StringIO()
187	(ret, timeouterr) = (0, False)
188
189	try:
190		# We are holding ends to stdin, stdout pipes
191		# hence we need to be sure to close those fds no mater what
192		start_time = time.time()
193		(ret, timeouterr) = _wait_for_command(sp, start_time,
194					timeout, stdout_file, stderr_file,
195					stdout_tee, stderr_tee)
196		result.exit_status = ret
197		result.duration = time.time() - start_time
198		# don't use os.read now, so we get all the rest of the output
199		_process_output(sp.stdout, stdout_file, stdout_tee,
200				use_os_read=False)
201		_process_output(sp.stderr, stderr_file, stderr_tee,
202				use_os_read=False)
203	finally:
204		# close our ends of the pipes to the sp no matter what
205		sp.stdout.close()
206		sp.stderr.close()
207
208	result.stdout = stdout_file.getvalue()
209	result.stderr = stderr_file.getvalue()
210
211	if result.exit_status != 0:
212		if timeouterr:
213			raise error.CmdError(command, result, "Command did not "
214					     "complete within %d seconds" % timeout)
215		elif not ignore_status:
216			raise error.CmdError(command, result,
217					     "Command returned non-zero exit status")
218
219	return result
220
221# this returns a tuple with the return code and a flag to specify if the error
222# is due to the process not terminating within timeout
223def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
224		      stdout_tee, stderr_tee):
225	if timeout:
226		stop_time = start_time + timeout
227		time_left = stop_time - time.time()
228	else:
229		time_left = None # so that select never times out
230	while not timeout or time_left > 0:
231		# select will return when stdout is ready (including when it is
232		# EOF, that is the process has terminated).
233		ready, _, _ = select.select([subproc.stdout, subproc.stderr],
234					     [], [], time_left)
235		# os.read() has to be used instead of
236		# subproc.stdout.read() which will otherwise block
237		if subproc.stdout in ready:
238			_process_output(subproc.stdout, stdout_file,
239					stdout_tee)
240		if subproc.stderr in ready:
241			_process_output(subproc.stderr, stderr_file,
242					stderr_tee)
243
244		exit_status_indication = subproc.poll()
245
246		if exit_status_indication is not None:
247			return (exit_status_indication, False)
248
249		if timeout:
250			time_left = stop_time - time.time()
251
252	# the process has not terminated within timeout,
253	# kill it via an escalating series of signals.
254	if exit_status_indication is None:
255		exit_status_indication = nuke_subprocess(subproc)
256
257	return (exit_status_indication, True)
258
259
260def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
261	if use_os_read:
262		data = os.read(pipe.fileno(), 1024)
263	else:
264		data = pipe.read()
265	fbuffer.write(data)
266	if teefile:
267		teefile.write(data)
268		teefile.flush()
269
270
271def nuke_subprocess(subproc):
272       # the process has not terminated within timeout,
273       # kill it via an escalating series of signals.
274       signal_queue = [signal.SIGTERM, signal.SIGKILL]
275       for sig in signal_queue:
276	       try:
277		       os.kill(subproc.pid, sig)
278	       # The process may have died before we could kill it.
279	       except OSError:
280		       pass
281
282	       for i in range(5):
283		       rc = subproc.poll()
284		       if rc != None:
285			       return rc
286		       time.sleep(1)
287
288
289def nuke_pid(pid):
290       # the process has not terminated within timeout,
291       # kill it via an escalating series of signals.
292       signal_queue = [signal.SIGTERM, signal.SIGKILL]
293       for sig in signal_queue:
294	       try:
295		       os.kill(pid, sig)
296
297	       # The process may have died before we could kill it.
298	       except OSError:
299		       pass
300
301	       try:
302		       for i in range(5):
303			       status = os.waitpid(pid, os.WNOHANG)[0]
304			       if status == pid:
305				       return
306			       time.sleep(1)
307
308		       if status != pid:
309			       raise error.AutoservRunError('Could not kill %d'
310				       % pid, None)
311
312	       # the process died before we join it.
313	       except OSError:
314		       pass
315
316
317def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
318	if use_os_read:
319		data = os.read(pipe.fileno(), 1024)
320	else:
321		data = pipe.read()
322	fbuffer.write(data)
323	if teefile:
324		teefile.write(data)
325		teefile.flush()
326
327
328def system(command, timeout=None, ignore_status=False):
329	return run(command, timeout, ignore_status,
330		stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
331
332
333def system_output(command, timeout=None, ignore_status=False,
334		  retain_output=False):
335	if retain_output:
336		out = run(command, timeout, ignore_status,
337			  stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
338	else:
339		out = run(command, timeout, ignore_status).stdout
340	if out[-1:] == '\n': out = out[:-1]
341	return out
342
343"""
344This function is used when there is a need to run more than one
345job simultaneously starting exactly at the same time. It basically returns
346a modified control file (containing the synchronization code prepended)
347whenever it is ready to run the control file. The synchronization
348is done using barriers to make sure that the jobs start at the same time.
349
350Here is how the synchronization is done to make sure that the tests
351start at exactly the same time on the client.
352sc_bar is a server barrier and s_bar, c_bar are the normal barriers
353
354                  Job1              Job2         ......      JobN
355 Server:   |                        sc_bar
356 Server:   |                        s_bar        ......      s_bar
357 Server:   |      at.run()         at.run()      ......      at.run()
358 ----------|------------------------------------------------------
359 Client    |      sc_bar
360 Client    |      c_bar             c_bar        ......      c_bar
361 Client    |    <run test>         <run test>    ......     <run test>
362
363
364PARAMS:
365   control_file : The control file which to which the above synchronization
366                  code would be prepended to
367   host_name    : The host name on which the job is going to run
368   host_num (non negative) : A number to identify the machine so that we have
369                  different sets of s_bar_ports for each of the machines.
370   instance     : The number of the job
371   num_jobs     : Total number of jobs that are going to run in parallel with
372                  this job starting at the same time
373   port_base    : Port number that is used to derive the actual barrier ports.
374
375RETURN VALUE:
376    The modified control file.
377
378"""
379def get_sync_control_file(control, host_name, host_num,
380			  instance, num_jobs, port_base=63100):
381	sc_bar_port = port_base
382	c_bar_port = port_base
383	if host_num < 0:
384		print "Please provide a non negative number for the host"
385		return None
386	s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
387                                              # the same for a given machine
388
389	sc_bar_timeout = 180
390	s_bar_timeout = c_bar_timeout = 120
391
392	# The barrier code snippet is prepended into the conrol file
393	# dynamically before at.run() is called finally.
394	control_new = []
395
396       	# jobid is the unique name used to identify the processes
397	# trying to reach the barriers
398	jobid = "%s#%d" % (host_name, instance)
399
400	rendv = []
401	# rendvstr is a temp holder for the rendezvous list of the processes
402	for n in range(num_jobs):
403		rendv.append("'%s#%d'" % (host_name, n))
404	rendvstr = ",".join(rendv)
405
406	if instance == 0:
407		# Do the setup and wait at the server barrier
408		# Clean up the tmp and the control dirs for the first instance
409		control_new.append('if os.path.exists(job.tmpdir):')
410		control_new.append("\t system('umount -f %s > /dev/null"
411				   "2> /dev/null' % job.tmpdir,"
412				   "ignore_status=True)")
413		control_new.append("\t system('rm -rf ' + job.tmpdir)")
414		control_new.append(
415		    'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
416		    % (jobid, sc_bar_timeout, sc_bar_port))
417		control_new.append(
418		'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
419		 % jobid)
420
421	elif instance == 1:
422		# Wait at the server barrier to wait for instance=0
423		# process to complete setup
424		b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
425			     port=sc_bar_port)
426		b0.rendevous_servers("PARALLEL_MASTER", jobid)
427
428		if(num_jobs > 2):
429			b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
430				     port=s_bar_port)
431	        	b1.rendevous(rendvstr)
432
433	else:
434		# For the rest of the clients
435		b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
436		b2.rendevous(rendvstr)
437
438	# Client side barrier for all the tests to start at the same time
439	control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
440			% (jobid, c_bar_timeout, c_bar_port))
441	control_new.append("b1.rendevous(%s)" % rendvstr)
442
443	# Stick in the rest of the control file
444	control_new.append(control)
445
446	return "\n".join(control_new)
447
448
449class CmdResult(object):
450	"""
451	Command execution result.
452
453	command:     String containing the command line itself
454	exit_status: Integer exit code of the process
455	stdout:      String containing stdout of the process
456	stderr:      String containing stderr of the process
457	duration:    Elapsed wall clock time running the process
458	"""
459
460
461	def __init__(self, command = None):
462		self.command = command
463		self.exit_status = None
464		self.stdout = ""
465		self.stderr = ""
466		self.duration = 0
467
468
469	def __repr__(self):
470		wrapper = textwrap.TextWrapper(width = 78,
471					       initial_indent="\n    ",
472					       subsequent_indent="    ")
473
474		stdout = self.stdout.rstrip()
475		if stdout:
476			stdout = "\nstdout:\n%s" % stdout
477
478		stderr = self.stderr.rstrip()
479		if stderr:
480			stderr = "\nstderr:\n%s" % stderr
481
482		return ("* Command: %s\n"
483			"Exit status: %s\n"
484			"Duration: %s\n"
485			"%s"
486			"%s"
487			% (wrapper.fill(self.command), self.exit_status,
488			self.duration, stdout, stderr))
489
490
491class run_randomly:
492
493
494	def __init__(self):
495		self.test_list = []
496
497
498	def add(self, *args, **dargs):
499		test = (args, dargs)
500		self.test_list.append(test)
501
502
503	def run(self, fn):
504		while self.test_list:
505			test_index = random.randint(0, len(self.test_list)-1)
506			(args, dargs) = self.test_list.pop(test_index)
507			fn(*args, **dargs)
508