utils.py revision e9be8c38ab90271133c8b3989055a9ef6aece480
1#!/usr/bin/python 2# 3# Copyright 2008 Google Inc. Released under the GPL v2 4 5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess 6import socket, sys, time, textwrap, urllib, urlparse 7import error, barrier 8 9 10def read_one_line(filename): 11 return open(filename, 'r').readline().strip() 12 13 14def write_one_line(filename, str): 15 open(filename, 'w').write(str.rstrip() + "\n") 16 17 18def read_keyval(path): 19 """ 20 Read a key-value pair format file into a dictionary, and return it. 21 Takes either a filename or directory name as input. If it's a 22 directory name, we assume you want the file to be called keyval. 23 """ 24 if os.path.isdir(path): 25 path = os.path.join(path, 'keyval') 26 keyval = {} 27 for line in open(path): 28 line = re.sub('#.*', '', line.rstrip()) 29 if not re.search(r'^[-\w]+=', line): 30 raise ValueError('Invalid format line: %s' % line) 31 key, value = line.split('=', 1) 32 if re.search('^\d+$', value): 33 value = int(value) 34 elif re.search('^(\d+\.)?\d+$', value): 35 value = float(value) 36 keyval[key] = value 37 return keyval 38 39 40def write_keyval(path, dictionary, type_tag=None): 41 """ 42 Write a key-value pair format file out to a file. This uses append 43 mode to open the file, so existing text will not be overwritten or 44 reparsed. 45 46 If type_tag is None, then the key must be composed of alphanumeric 47 characters (or dashes+underscores). However, if type-tag is not 48 null then the keys must also have "{type_tag}" as a suffix. At 49 the moment the only valid values of type_tag are "attr" and "perf". 50 """ 51 if os.path.isdir(path): 52 path = os.path.join(path, 'keyval') 53 keyval = open(path, 'a') 54 55 if type_tag is None: 56 key_regex = re.compile(r'^[-\w]+$') 57 else: 58 if type_tag not in ('attr', 'perf'): 59 raise ValueError('Invalid type tag: %s' % type_tag) 60 escaped_tag = re.escape(type_tag) 61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag) 62 try: 63 for key, value in dictionary.iteritems(): 64 if not key_regex.search(key): 65 raise ValueError('Invalid key: %s' % key) 66 keyval.write('%s=%s\n' % (key, value)) 67 finally: 68 keyval.close() 69 70 71def is_url(path): 72 """Return true if path looks like a URL""" 73 # for now, just handle http and ftp 74 url_parts = urlparse.urlparse(path) 75 return (url_parts[0] in ('http', 'ftp')) 76 77 78def urlopen(url, data=None, proxies=None, timeout=300): 79 """Wrapper to urllib.urlopen with timeout addition.""" 80 81 # Save old timeout 82 old_timeout = socket.getdefaulttimeout() 83 socket.setdefaulttimeout(timeout) 84 try: 85 return urllib.urlopen(url, data=data, proxies=proxies) 86 finally: 87 socket.setdefaulttimeout(old_timeout) 88 89 90def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300): 91 """Wrapper to urllib.urlretrieve with timeout addition.""" 92 old_timeout = socket.getdefaulttimeout() 93 socket.setdefaulttimeout(timeout) 94 try: 95 return urllib.urlretrieve(url, filename=filename, 96 reporthook=reporthook, data=data) 97 finally: 98 socket.setdefaulttimeout(old_timeout) 99 100 101def get_file(src, dest, permissions=None): 102 """Get a file from src, which can be local or a remote URL""" 103 if (src == dest): 104 return 105 if (is_url(src)): 106 print 'PWD: ' + os.getcwd() 107 print 'Fetching \n\t', src, '\n\t->', dest 108 try: 109 urllib.urlretrieve(src, dest) 110 except IOError, e: 111 raise error.AutotestError('Unable to retrieve %s (to %s)' 112 % (src, dest), e) 113 else: 114 shutil.copyfile(src, dest) 115 if permissions: 116 os.chmod(dest, permissions) 117 return dest 118 119 120def unmap_url(srcdir, src, destdir='.'): 121 """ 122 Receives either a path to a local file or a URL. 123 returns either the path to the local file, or the fetched URL 124 125 unmap_url('/usr/src', 'foo.tar', '/tmp') 126 = '/usr/src/foo.tar' 127 unmap_url('/usr/src', 'http://site/file', '/tmp') 128 = '/tmp/file' 129 (after retrieving it) 130 """ 131 if is_url(src): 132 url_parts = urlparse.urlparse(src) 133 filename = os.path.basename(url_parts[2]) 134 dest = os.path.join(destdir, filename) 135 return get_file(src, dest) 136 else: 137 return os.path.join(srcdir, src) 138 139 140def update_version(srcdir, preserve_srcdir, new_version, install, 141 *args, **dargs): 142 """ 143 Make sure srcdir is version new_version 144 145 If not, delete it and install() the new version. 146 147 In the preserve_srcdir case, we just check it's up to date, 148 and if not, we rerun install, without removing srcdir 149 """ 150 versionfile = os.path.join(srcdir, '.version') 151 install_needed = True 152 153 if os.path.exists(versionfile): 154 old_version = pickle.load(open(versionfile)) 155 if old_version == new_version: 156 install_needed = False 157 158 if install_needed: 159 if not preserve_srcdir and os.path.exists(srcdir): 160 shutil.rmtree(srcdir) 161 install(*args, **dargs) 162 if os.path.exists(srcdir): 163 pickle.dump(new_version, open(versionfile, 'w')) 164 165 166def run(command, timeout=None, ignore_status=False, 167 stdout_tee=None, stderr_tee=None): 168 """ 169 Run a command on the host. 170 171 Args: 172 command: the command line string 173 timeout: time limit in seconds before attempting to 174 kill the running process. The run() function 175 will take a few seconds longer than 'timeout' 176 to complete if it has to kill the process. 177 ignore_status: do not raise an exception, no matter what 178 the exit code of the command is. 179 stdout_tee: optional file-like object to which stdout data 180 will be written as it is generated (data will still 181 be stored in result.stdout) 182 stderr_tee: likewise for stderr 183 184 Returns: 185 a CmdResult object 186 187 Raises: 188 CmdError: the exit code of the command 189 execution was not 0 190 """ 191 return join_bg_job(run_bg(command), command, timeout, ignore_status, 192 stdout_tee, stderr_tee) 193 194 195def run_bg(command): 196 """Run the command in a subprocess and return the subprocess.""" 197 result = CmdResult(command) 198 def reset_sigpipe(): 199 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 200 sp = subprocess.Popen(command, stdout=subprocess.PIPE, 201 stderr=subprocess.PIPE, preexec_fn=reset_sigpipe, 202 shell=True, executable="/bin/bash") 203 return sp, result 204 205 206def join_bg_job(bg_job, command, timeout=None, ignore_status=False, 207 stdout_tee=None, stderr_tee=None): 208 """Join the subprocess with the current thread. See run description.""" 209 sp, result = bg_job 210 stdout_file = StringIO.StringIO() 211 stderr_file = StringIO.StringIO() 212 (ret, timeouterr) = (0, False) 213 214 try: 215 # We are holding ends to stdin, stdout pipes 216 # hence we need to be sure to close those fds no mater what 217 start_time = time.time() 218 (ret, timeouterr) = _wait_for_command(sp, start_time, 219 timeout, stdout_file, stderr_file, 220 stdout_tee, stderr_tee) 221 result.exit_status = ret 222 result.duration = time.time() - start_time 223 # don't use os.read now, so we get all the rest of the output 224 _process_output(sp.stdout, stdout_file, stdout_tee, 225 use_os_read=False) 226 _process_output(sp.stderr, stderr_file, stderr_tee, 227 use_os_read=False) 228 finally: 229 # close our ends of the pipes to the sp no matter what 230 sp.stdout.close() 231 sp.stderr.close() 232 233 result.stdout = stdout_file.getvalue() 234 result.stderr = stderr_file.getvalue() 235 236 if result.exit_status != 0: 237 if timeouterr: 238 raise error.CmdError(command, result, "Command did not " 239 "complete within %d seconds" % timeout) 240 elif not ignore_status: 241 raise error.CmdError(command, result, 242 "Command returned non-zero exit status") 243 244 return result 245 246# this returns a tuple with the return code and a flag to specify if the error 247# is due to the process not terminating within timeout 248def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file, 249 stdout_tee, stderr_tee): 250 if timeout: 251 stop_time = start_time + timeout 252 time_left = stop_time - time.time() 253 else: 254 time_left = None # so that select never times out 255 while not timeout or time_left > 0: 256 # select will return when stdout is ready (including when it is 257 # EOF, that is the process has terminated). 258 ready, _, _ = select.select([subproc.stdout, subproc.stderr], 259 [], [], time_left) 260 # os.read() has to be used instead of 261 # subproc.stdout.read() which will otherwise block 262 if subproc.stdout in ready: 263 _process_output(subproc.stdout, stdout_file, 264 stdout_tee) 265 if subproc.stderr in ready: 266 _process_output(subproc.stderr, stderr_file, 267 stderr_tee) 268 269 exit_status_indication = subproc.poll() 270 271 if exit_status_indication is not None: 272 return (exit_status_indication, False) 273 274 if timeout: 275 time_left = stop_time - time.time() 276 277 # the process has not terminated within timeout, 278 # kill it via an escalating series of signals. 279 if exit_status_indication is None: 280 exit_status_indication = nuke_subprocess(subproc) 281 282 return (exit_status_indication, True) 283 284 285def nuke_subprocess(subproc): 286 # the process has not terminated within timeout, 287 # kill it via an escalating series of signals. 288 signal_queue = [signal.SIGTERM, signal.SIGKILL] 289 for sig in signal_queue: 290 try: 291 os.kill(subproc.pid, sig) 292 # The process may have died before we could kill it. 293 except OSError: 294 pass 295 296 for i in range(5): 297 rc = subproc.poll() 298 if rc != None: 299 return rc 300 time.sleep(1) 301 302 303def nuke_pid(pid): 304 # the process has not terminated within timeout, 305 # kill it via an escalating series of signals. 306 signal_queue = [signal.SIGTERM, signal.SIGKILL] 307 for sig in signal_queue: 308 try: 309 os.kill(pid, sig) 310 311 # The process may have died before we could kill it. 312 except OSError: 313 pass 314 315 try: 316 for i in range(5): 317 status = os.waitpid(pid, os.WNOHANG)[0] 318 if status == pid: 319 return 320 time.sleep(1) 321 322 if status != pid: 323 raise error.AutoservRunError('Could not kill %d' 324 % pid, None) 325 326 # the process died before we join it. 327 except OSError: 328 pass 329 330 331def _process_output(pipe, fbuffer, teefile=None, use_os_read=True): 332 if use_os_read: 333 data = os.read(pipe.fileno(), 1024) 334 else: 335 data = pipe.read() 336 fbuffer.write(data) 337 if teefile: 338 teefile.write(data) 339 teefile.flush() 340 341 342def system(command, timeout=None, ignore_status=False): 343 return run(command, timeout, ignore_status, 344 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status 345 346 347def system_output(command, timeout=None, ignore_status=False, 348 retain_output=False): 349 if retain_output: 350 out = run(command, timeout, ignore_status, 351 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout 352 else: 353 out = run(command, timeout, ignore_status).stdout 354 if out[-1:] == '\n': out = out[:-1] 355 return out 356 357""" 358This function is used when there is a need to run more than one 359job simultaneously starting exactly at the same time. It basically returns 360a modified control file (containing the synchronization code prepended) 361whenever it is ready to run the control file. The synchronization 362is done using barriers to make sure that the jobs start at the same time. 363 364Here is how the synchronization is done to make sure that the tests 365start at exactly the same time on the client. 366sc_bar is a server barrier and s_bar, c_bar are the normal barriers 367 368 Job1 Job2 ...... JobN 369 Server: | sc_bar 370 Server: | s_bar ...... s_bar 371 Server: | at.run() at.run() ...... at.run() 372 ----------|------------------------------------------------------ 373 Client | sc_bar 374 Client | c_bar c_bar ...... c_bar 375 Client | <run test> <run test> ...... <run test> 376 377 378PARAMS: 379 control_file : The control file which to which the above synchronization 380 code would be prepended to 381 host_name : The host name on which the job is going to run 382 host_num (non negative) : A number to identify the machine so that we have 383 different sets of s_bar_ports for each of the machines. 384 instance : The number of the job 385 num_jobs : Total number of jobs that are going to run in parallel with 386 this job starting at the same time 387 port_base : Port number that is used to derive the actual barrier ports. 388 389RETURN VALUE: 390 The modified control file. 391 392""" 393def get_sync_control_file(control, host_name, host_num, 394 instance, num_jobs, port_base=63100): 395 sc_bar_port = port_base 396 c_bar_port = port_base 397 if host_num < 0: 398 print "Please provide a non negative number for the host" 399 return None 400 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 401 # the same for a given machine 402 403 sc_bar_timeout = 180 404 s_bar_timeout = c_bar_timeout = 120 405 406 # The barrier code snippet is prepended into the conrol file 407 # dynamically before at.run() is called finally. 408 control_new = [] 409 410 # jobid is the unique name used to identify the processes 411 # trying to reach the barriers 412 jobid = "%s#%d" % (host_name, instance) 413 414 rendv = [] 415 # rendvstr is a temp holder for the rendezvous list of the processes 416 for n in range(num_jobs): 417 rendv.append("'%s#%d'" % (host_name, n)) 418 rendvstr = ",".join(rendv) 419 420 if instance == 0: 421 # Do the setup and wait at the server barrier 422 # Clean up the tmp and the control dirs for the first instance 423 control_new.append('if os.path.exists(job.tmpdir):') 424 control_new.append("\t system('umount -f %s > /dev/null" 425 "2> /dev/null' % job.tmpdir," 426 "ignore_status=True)") 427 control_new.append("\t system('rm -rf ' + job.tmpdir)") 428 control_new.append( 429 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 430 % (jobid, sc_bar_timeout, sc_bar_port)) 431 control_new.append( 432 'b0.rendevous_servers("PARALLEL_MASTER", "%s")' 433 % jobid) 434 435 elif instance == 1: 436 # Wait at the server barrier to wait for instance=0 437 # process to complete setup 438 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 439 port=sc_bar_port) 440 b0.rendevous_servers("PARALLEL_MASTER", jobid) 441 442 if(num_jobs > 2): 443 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 444 port=s_bar_port) 445 b1.rendevous(rendvstr) 446 447 else: 448 # For the rest of the clients 449 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 450 b2.rendevous(rendvstr) 451 452 # Client side barrier for all the tests to start at the same time 453 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 454 % (jobid, c_bar_timeout, c_bar_port)) 455 control_new.append("b1.rendevous(%s)" % rendvstr) 456 457 # Stick in the rest of the control file 458 control_new.append(control) 459 460 return "\n".join(control_new) 461 462 463class CmdResult(object): 464 """ 465 Command execution result. 466 467 command: String containing the command line itself 468 exit_status: Integer exit code of the process 469 stdout: String containing stdout of the process 470 stderr: String containing stderr of the process 471 duration: Elapsed wall clock time running the process 472 """ 473 474 475 def __init__(self, command=None, stdout="", stderr="", 476 exit_status=None, duration=0): 477 self.command = command 478 self.exit_status = exit_status 479 self.stdout = stdout 480 self.stderr = stderr 481 self.duration = duration 482 483 484 def __repr__(self): 485 wrapper = textwrap.TextWrapper(width = 78, 486 initial_indent="\n ", 487 subsequent_indent=" ") 488 489 stdout = self.stdout.rstrip() 490 if stdout: 491 stdout = "\nstdout:\n%s" % stdout 492 493 stderr = self.stderr.rstrip() 494 if stderr: 495 stderr = "\nstderr:\n%s" % stderr 496 497 return ("* Command: %s\n" 498 "Exit status: %s\n" 499 "Duration: %s\n" 500 "%s" 501 "%s" 502 % (wrapper.fill(self.command), self.exit_status, 503 self.duration, stdout, stderr)) 504 505 506class run_randomly: 507 def __init__(self, run_sequentially=False): 508 # Run sequentially is for debugging control files 509 self.test_list = [] 510 self.run_sequentially = run_sequentially 511 512 513 def add(self, *args, **dargs): 514 test = (args, dargs) 515 self.test_list.append(test) 516 517 518 def run(self, fn): 519 while self.test_list: 520 test_index = random.randint(0, len(self.test_list)-1) 521 if self.run_sequentially: 522 test_index = 0 523 (args, dargs) = self.test_list.pop(test_index) 524 fn(*args, **dargs) 525