utils.py revision d93d7d2a9728bbed6d125a6d8a9f61bb3f2b6718
1#!/usr/bin/python 2# 3# Copyright 2008 Google Inc. Released under the GPL v2 4 5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess 6import sys, time, textwrap, urllib, urlparse 7import error, barrier 8 9 10def read_one_line(filename): 11 return open(filename, 'r').readline().strip() 12 13 14def write_one_line(filename, str): 15 open(filename, 'w').write(str.rstrip() + "\n") 16 17 18def read_keyval(path): 19 """ 20 Read a key-value pair format file into a dictionary, and return it. 21 Takes either a filename or directory name as input. If it's a 22 directory name, we assume you want the file to be called keyval. 23 """ 24 if os.path.isdir(path): 25 path = os.path.join(path, 'keyval') 26 keyval = {} 27 for line in open(path): 28 line = re.sub('#.*', '', line.rstrip()) 29 if not re.search(r'^[-\w]+=', line): 30 raise ValueError('Invalid format line: %s' % line) 31 key, value = line.split('=', 1) 32 if re.search('^\d+$', value): 33 value = int(value) 34 elif re.search('^(\d+\.)?\d+$', value): 35 value = float(value) 36 keyval[key] = value 37 return keyval 38 39 40def write_keyval(path, dictionary, type_tag=None): 41 """ 42 Write a key-value pair format file out to a file. This uses append 43 mode to open the file, so existing text will not be overwritten or 44 reparsed. 45 46 If type_tag is None, then the key must be composed of alphanumeric 47 characters (or dashes+underscores). However, if type-tag is not 48 null then the keys must also have "{type_tag}" as a suffix. At 49 the moment the only valid values of type_tag are "attr" and "perf". 50 """ 51 if os.path.isdir(path): 52 path = os.path.join(path, 'keyval') 53 keyval = open(path, 'a') 54 55 if type_tag is None: 56 key_regex = re.compile(r'^[-\w]+$') 57 else: 58 if type_tag not in ('attr', 'perf'): 59 raise ValueError('Invalid type tag: %s' % type_tag) 60 escaped_tag = re.escape(type_tag) 61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag) 62 try: 63 for key, value in dictionary.iteritems(): 64 if not key_regex.search(key): 65 raise ValueError('Invalid key: %s' % key) 66 keyval.write('%s=%s\n' % (key, value)) 67 finally: 68 keyval.close() 69 70 71def is_url(path): 72 """Return true if path looks like a URL""" 73 # for now, just handle http and ftp 74 url_parts = urlparse.urlparse(path) 75 return (url_parts[0] in ('http', 'ftp')) 76 77 78def get_file(src, dest, permissions=None): 79 """Get a file from src, which can be local or a remote URL""" 80 if (src == dest): 81 return 82 if (is_url(src)): 83 print 'PWD: ' + os.getcwd() 84 print 'Fetching \n\t', src, '\n\t->', dest 85 try: 86 urllib.urlretrieve(src, dest) 87 except IOError, e: 88 raise error.AutotestError('Unable to retrieve %s (to %s)' 89 % (src, dest), e) 90 else: 91 shutil.copyfile(src, dest) 92 if permissions: 93 os.chmod(dest, permissions) 94 return dest 95 96 97def unmap_url(srcdir, src, destdir='.'): 98 """ 99 Receives either a path to a local file or a URL. 100 returns either the path to the local file, or the fetched URL 101 102 unmap_url('/usr/src', 'foo.tar', '/tmp') 103 = '/usr/src/foo.tar' 104 unmap_url('/usr/src', 'http://site/file', '/tmp') 105 = '/tmp/file' 106 (after retrieving it) 107 """ 108 if is_url(src): 109 url_parts = urlparse.urlparse(src) 110 filename = os.path.basename(url_parts[2]) 111 dest = os.path.join(destdir, filename) 112 return get_file(src, dest) 113 else: 114 return os.path.join(srcdir, src) 115 116 117def update_version(srcdir, preserve_srcdir, new_version, install, 118 *args, **dargs): 119 """ 120 Make sure srcdir is version new_version 121 122 If not, delete it and install() the new version. 123 124 In the preserve_srcdir case, we just check it's up to date, 125 and if not, we rerun install, without removing srcdir 126 """ 127 versionfile = os.path.join(srcdir, '.version') 128 install_needed = True 129 130 if os.path.exists(versionfile): 131 old_version = pickle.load(open(versionfile)) 132 if old_version == new_version: 133 install_needed = False 134 135 if install_needed: 136 if not preserve_srcdir and os.path.exists(srcdir): 137 shutil.rmtree(srcdir) 138 install(*args, **dargs) 139 if os.path.exists(srcdir): 140 pickle.dump(new_version, open(versionfile, 'w')) 141 142 143def run(command, timeout=None, ignore_status=False, 144 stdout_tee=None, stderr_tee=None): 145 """ 146 Run a command on the host. 147 148 Args: 149 command: the command line string 150 timeout: time limit in seconds before attempting to 151 kill the running process. The run() function 152 will take a few seconds longer than 'timeout' 153 to complete if it has to kill the process. 154 ignore_status: do not raise an exception, no matter what 155 the exit code of the command is. 156 stdout_tee: optional file-like object to which stdout data 157 will be written as it is generated (data will still 158 be stored in result.stdout) 159 stderr_tee: likewise for stderr 160 161 Returns: 162 a CmdResult object 163 164 Raises: 165 CmdError: the exit code of the command 166 execution was not 0 167 """ 168 return join_bg_job(run_bg(command), command, timeout, ignore_status, 169 stdout_tee, stderr_tee) 170 171 172def run_bg(command): 173 """Run the command in a subprocess and return the subprocess.""" 174 result = CmdResult(command) 175 sp = subprocess.Popen(command, stdout=subprocess.PIPE, 176 stderr=subprocess.PIPE, 177 shell=True, executable="/bin/bash") 178 return sp, result 179 180 181def join_bg_job(bg_job, command, timeout=None, ignore_status=False, 182 stdout_tee=None, stderr_tee=None): 183 """Join the subprocess with the current thread. See run description.""" 184 sp, result = bg_job 185 stdout_file = StringIO.StringIO() 186 stderr_file = StringIO.StringIO() 187 (ret, timeouterr) = (0, False) 188 189 try: 190 # We are holding ends to stdin, stdout pipes 191 # hence we need to be sure to close those fds no mater what 192 start_time = time.time() 193 (ret, timeouterr) = _wait_for_command(sp, start_time, 194 timeout, stdout_file, stderr_file, 195 stdout_tee, stderr_tee) 196 result.exit_status = ret 197 result.duration = time.time() - start_time 198 # don't use os.read now, so we get all the rest of the output 199 _process_output(sp.stdout, stdout_file, stdout_tee, 200 use_os_read=False) 201 _process_output(sp.stderr, stderr_file, stderr_tee, 202 use_os_read=False) 203 finally: 204 # close our ends of the pipes to the sp no matter what 205 sp.stdout.close() 206 sp.stderr.close() 207 208 result.stdout = stdout_file.getvalue() 209 result.stderr = stderr_file.getvalue() 210 211 if result.exit_status != 0: 212 if timeouterr: 213 raise error.CmdError(command, result, "Command did not " 214 "complete within %d seconds" % timeout) 215 elif not ignore_status: 216 raise error.CmdError(command, result, 217 "Command returned non-zero exit status") 218 219 return result 220 221# this returns a tuple with the return code and a flag to specify if the error 222# is due to the process not terminating within timeout 223def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file, 224 stdout_tee, stderr_tee): 225 if timeout: 226 stop_time = start_time + timeout 227 time_left = stop_time - time.time() 228 else: 229 time_left = None # so that select never times out 230 while not timeout or time_left > 0: 231 # select will return when stdout is ready (including when it is 232 # EOF, that is the process has terminated). 233 ready, _, _ = select.select([subproc.stdout, subproc.stderr], 234 [], [], time_left) 235 # os.read() has to be used instead of 236 # subproc.stdout.read() which will otherwise block 237 if subproc.stdout in ready: 238 _process_output(subproc.stdout, stdout_file, 239 stdout_tee) 240 if subproc.stderr in ready: 241 _process_output(subproc.stderr, stderr_file, 242 stderr_tee) 243 244 exit_status_indication = subproc.poll() 245 246 if exit_status_indication is not None: 247 return (exit_status_indication, False) 248 249 if timeout: 250 time_left = stop_time - time.time() 251 252 # the process has not terminated within timeout, 253 # kill it via an escalating series of signals. 254 if exit_status_indication is None: 255 exit_status_indication = nuke_subprocess(subproc) 256 257 return (exit_status_indication, True) 258 259 260def _process_output(pipe, fbuffer, teefile=None, use_os_read=True): 261 if use_os_read: 262 data = os.read(pipe.fileno(), 1024) 263 else: 264 data = pipe.read() 265 fbuffer.write(data) 266 if teefile: 267 teefile.write(data) 268 teefile.flush() 269 270 271def nuke_subprocess(subproc): 272 # the process has not terminated within timeout, 273 # kill it via an escalating series of signals. 274 signal_queue = [signal.SIGTERM, signal.SIGKILL] 275 for sig in signal_queue: 276 try: 277 os.kill(subproc.pid, sig) 278 # The process may have died before we could kill it. 279 except OSError: 280 pass 281 282 for i in range(5): 283 rc = subproc.poll() 284 if rc != None: 285 return rc 286 time.sleep(1) 287 288 289def nuke_pid(pid): 290 # the process has not terminated within timeout, 291 # kill it via an escalating series of signals. 292 signal_queue = [signal.SIGTERM, signal.SIGKILL] 293 for sig in signal_queue: 294 try: 295 os.kill(pid, sig) 296 297 # The process may have died before we could kill it. 298 except OSError: 299 pass 300 301 try: 302 for i in range(5): 303 status = os.waitpid(pid, os.WNOHANG)[0] 304 if status == pid: 305 return 306 time.sleep(1) 307 308 if status != pid: 309 raise error.AutoservRunError('Could not kill %d' 310 % pid, None) 311 312 # the process died before we join it. 313 except OSError: 314 pass 315 316 317def _process_output(pipe, fbuffer, teefile=None, use_os_read=True): 318 if use_os_read: 319 data = os.read(pipe.fileno(), 1024) 320 else: 321 data = pipe.read() 322 fbuffer.write(data) 323 if teefile: 324 teefile.write(data) 325 teefile.flush() 326 327 328def system(command, timeout=None, ignore_status=False): 329 return run(command, timeout, ignore_status, 330 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status 331 332 333def system_output(command, timeout=None, ignore_status=False, 334 retain_output=False): 335 if retain_output: 336 out = run(command, timeout, ignore_status, 337 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout 338 else: 339 out = run(command, timeout, ignore_status).stdout 340 if out[-1:] == '\n': out = out[:-1] 341 return out 342 343""" 344This function is used when there is a need to run more than one 345job simultaneously starting exactly at the same time. It basically returns 346a modified control file (containing the synchronization code prepended) 347whenever it is ready to run the control file. The synchronization 348is done using barriers to make sure that the jobs start at the same time. 349 350Here is how the synchronization is done to make sure that the tests 351start at exactly the same time on the client. 352sc_bar is a server barrier and s_bar, c_bar are the normal barriers 353 354 Job1 Job2 ...... JobN 355 Server: | sc_bar 356 Server: | s_bar ...... s_bar 357 Server: | at.run() at.run() ...... at.run() 358 ----------|------------------------------------------------------ 359 Client | sc_bar 360 Client | c_bar c_bar ...... c_bar 361 Client | <run test> <run test> ...... <run test> 362 363 364PARAMS: 365 control_file : The control file which to which the above synchronization 366 code would be prepended to 367 host_name : The host name on which the job is going to run 368 host_num (non negative) : A number to identify the machine so that we have 369 different sets of s_bar_ports for each of the machines. 370 instance : The number of the job 371 num_jobs : Total number of jobs that are going to run in parallel with 372 this job starting at the same time 373 port_base : Port number that is used to derive the actual barrier ports. 374 375RETURN VALUE: 376 The modified control file. 377 378""" 379def get_sync_control_file(control, host_name, host_num, 380 instance, num_jobs, port_base=63100): 381 sc_bar_port = port_base 382 c_bar_port = port_base 383 if host_num < 0: 384 print "Please provide a non negative number for the host" 385 return None 386 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 387 # the same for a given machine 388 389 sc_bar_timeout = 180 390 s_bar_timeout = c_bar_timeout = 120 391 392 # The barrier code snippet is prepended into the conrol file 393 # dynamically before at.run() is called finally. 394 control_new = [] 395 396 # jobid is the unique name used to identify the processes 397 # trying to reach the barriers 398 jobid = "%s#%d" % (host_name, instance) 399 400 rendv = [] 401 # rendvstr is a temp holder for the rendezvous list of the processes 402 for n in range(num_jobs): 403 rendv.append("'%s#%d'" % (host_name, n)) 404 rendvstr = ",".join(rendv) 405 406 if instance == 0: 407 # Do the setup and wait at the server barrier 408 # Clean up the tmp and the control dirs for the first instance 409 control_new.append('if os.path.exists(job.tmpdir):') 410 control_new.append("\t system('umount -f %s > /dev/null" 411 "2> /dev/null' % job.tmpdir," 412 "ignore_status=True)") 413 control_new.append("\t system('rm -rf ' + job.tmpdir)") 414 control_new.append( 415 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 416 % (jobid, sc_bar_timeout, sc_bar_port)) 417 control_new.append( 418 'b0.rendevous_servers("PARALLEL_MASTER", "%s")' 419 % jobid) 420 421 elif instance == 1: 422 # Wait at the server barrier to wait for instance=0 423 # process to complete setup 424 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 425 port=sc_bar_port) 426 b0.rendevous_servers("PARALLEL_MASTER", jobid) 427 428 if(num_jobs > 2): 429 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 430 port=s_bar_port) 431 b1.rendevous(rendvstr) 432 433 else: 434 # For the rest of the clients 435 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 436 b2.rendevous(rendvstr) 437 438 # Client side barrier for all the tests to start at the same time 439 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 440 % (jobid, c_bar_timeout, c_bar_port)) 441 control_new.append("b1.rendevous(%s)" % rendvstr) 442 443 # Stick in the rest of the control file 444 control_new.append(control) 445 446 return "\n".join(control_new) 447 448 449class CmdResult(object): 450 """ 451 Command execution result. 452 453 command: String containing the command line itself 454 exit_status: Integer exit code of the process 455 stdout: String containing stdout of the process 456 stderr: String containing stderr of the process 457 duration: Elapsed wall clock time running the process 458 """ 459 460 461 def __init__(self, command = None): 462 self.command = command 463 self.exit_status = None 464 self.stdout = "" 465 self.stderr = "" 466 self.duration = 0 467 468 469 def __repr__(self): 470 wrapper = textwrap.TextWrapper(width = 78, 471 initial_indent="\n ", 472 subsequent_indent=" ") 473 474 stdout = self.stdout.rstrip() 475 if stdout: 476 stdout = "\nstdout:\n%s" % stdout 477 478 stderr = self.stderr.rstrip() 479 if stderr: 480 stderr = "\nstderr:\n%s" % stderr 481 482 return ("* Command: %s\n" 483 "Exit status: %s\n" 484 "Duration: %s\n" 485 "%s" 486 "%s" 487 % (wrapper.fill(self.command), self.exit_status, 488 self.duration, stdout, stderr)) 489 490 491class run_randomly: 492 493 494 def __init__(self): 495 self.test_list = [] 496 497 498 def add(self, *args, **dargs): 499 test = (args, dargs) 500 self.test_list.append(test) 501 502 503 def run(self, fn): 504 while self.test_list: 505 test_index = random.randint(0, len(self.test_list)-1) 506 (args, dargs) = self.test_list.pop(test_index) 507 fn(*args, **dargs) 508