utils.py revision c5ddfd1f71caef9ec0c84c53ef7db42fcdc33e1c
1#!/usr/bin/python 2# 3# Copyright 2008 Google Inc. Released under the GPL v2 4 5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess 6import socket, sys, time, textwrap, urllib, urlparse 7import error, barrier 8 9 10def read_one_line(filename): 11 return open(filename, 'r').readline().rstrip('\n') 12 13 14def write_one_line(filename, str): 15 open(filename, 'w').write(str.rstrip('\n') + '\n') 16 17 18def read_keyval(path): 19 """ 20 Read a key-value pair format file into a dictionary, and return it. 21 Takes either a filename or directory name as input. If it's a 22 directory name, we assume you want the file to be called keyval. 23 """ 24 if os.path.isdir(path): 25 path = os.path.join(path, 'keyval') 26 keyval = {} 27 for line in open(path): 28 line = re.sub('#.*', '', line).rstrip() 29 if not re.search(r'^[-\w]+=', line): 30 raise ValueError('Invalid format line: %s' % line) 31 key, value = line.split('=', 1) 32 if re.search('^\d+$', value): 33 value = int(value) 34 elif re.search('^(\d+\.)?\d+$', value): 35 value = float(value) 36 keyval[key] = value 37 return keyval 38 39 40def write_keyval(path, dictionary, type_tag=None): 41 """ 42 Write a key-value pair format file out to a file. This uses append 43 mode to open the file, so existing text will not be overwritten or 44 reparsed. 45 46 If type_tag is None, then the key must be composed of alphanumeric 47 characters (or dashes+underscores). However, if type-tag is not 48 null then the keys must also have "{type_tag}" as a suffix. At 49 the moment the only valid values of type_tag are "attr" and "perf". 50 """ 51 if os.path.isdir(path): 52 path = os.path.join(path, 'keyval') 53 keyval = open(path, 'a') 54 55 if type_tag is None: 56 key_regex = re.compile(r'^[-\w]+$') 57 else: 58 if type_tag not in ('attr', 'perf'): 59 raise ValueError('Invalid type tag: %s' % type_tag) 60 escaped_tag = re.escape(type_tag) 61 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag) 62 try: 63 for key, value in dictionary.iteritems(): 64 if not key_regex.search(key): 65 raise ValueError('Invalid key: %s' % key) 66 keyval.write('%s=%s\n' % (key, value)) 67 finally: 68 keyval.close() 69 70 71def is_url(path): 72 """Return true if path looks like a URL""" 73 # for now, just handle http and ftp 74 url_parts = urlparse.urlparse(path) 75 return (url_parts[0] in ('http', 'ftp')) 76 77 78def urlopen(url, data=None, proxies=None, timeout=300): 79 """Wrapper to urllib.urlopen with timeout addition.""" 80 81 # Save old timeout 82 old_timeout = socket.getdefaulttimeout() 83 socket.setdefaulttimeout(timeout) 84 try: 85 return urllib.urlopen(url, data=data, proxies=proxies) 86 finally: 87 socket.setdefaulttimeout(old_timeout) 88 89 90def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300): 91 """Wrapper to urllib.urlretrieve with timeout addition.""" 92 old_timeout = socket.getdefaulttimeout() 93 socket.setdefaulttimeout(timeout) 94 try: 95 return urllib.urlretrieve(url, filename=filename, 96 reporthook=reporthook, data=data) 97 finally: 98 socket.setdefaulttimeout(old_timeout) 99 100 101def get_file(src, dest, permissions=None): 102 """Get a file from src, which can be local or a remote URL""" 103 if (src == dest): 104 return 105 if (is_url(src)): 106 print 'PWD: ' + os.getcwd() 107 print 'Fetching \n\t', src, '\n\t->', dest 108 try: 109 urllib.urlretrieve(src, dest) 110 except IOError, e: 111 raise error.AutotestError('Unable to retrieve %s (to %s)' 112 % (src, dest), e) 113 else: 114 shutil.copyfile(src, dest) 115 if permissions: 116 os.chmod(dest, permissions) 117 return dest 118 119 120def unmap_url(srcdir, src, destdir='.'): 121 """ 122 Receives either a path to a local file or a URL. 123 returns either the path to the local file, or the fetched URL 124 125 unmap_url('/usr/src', 'foo.tar', '/tmp') 126 = '/usr/src/foo.tar' 127 unmap_url('/usr/src', 'http://site/file', '/tmp') 128 = '/tmp/file' 129 (after retrieving it) 130 """ 131 if is_url(src): 132 url_parts = urlparse.urlparse(src) 133 filename = os.path.basename(url_parts[2]) 134 dest = os.path.join(destdir, filename) 135 return get_file(src, dest) 136 else: 137 return os.path.join(srcdir, src) 138 139 140def update_version(srcdir, preserve_srcdir, new_version, install, 141 *args, **dargs): 142 """ 143 Make sure srcdir is version new_version 144 145 If not, delete it and install() the new version. 146 147 In the preserve_srcdir case, we just check it's up to date, 148 and if not, we rerun install, without removing srcdir 149 """ 150 versionfile = os.path.join(srcdir, '.version') 151 install_needed = True 152 153 if os.path.exists(versionfile): 154 old_version = pickle.load(open(versionfile)) 155 if old_version == new_version: 156 install_needed = False 157 158 if install_needed: 159 if not preserve_srcdir and os.path.exists(srcdir): 160 shutil.rmtree(srcdir) 161 install(*args, **dargs) 162 if os.path.exists(srcdir): 163 pickle.dump(new_version, open(versionfile, 'w')) 164 165 166def run(command, timeout=None, ignore_status=False, 167 stdout_tee=None, stderr_tee=None): 168 """ 169 Run a command on the host. 170 171 Args: 172 command: the command line string 173 timeout: time limit in seconds before attempting to 174 kill the running process. The run() function 175 will take a few seconds longer than 'timeout' 176 to complete if it has to kill the process. 177 ignore_status: do not raise an exception, no matter what 178 the exit code of the command is. 179 stdout_tee: optional file-like object to which stdout data 180 will be written as it is generated (data will still 181 be stored in result.stdout) 182 stderr_tee: likewise for stderr 183 184 Returns: 185 a CmdResult object 186 187 Raises: 188 CmdError: the exit code of the command 189 execution was not 0 190 """ 191 return join_bg_job(run_bg(command), command, timeout, ignore_status, 192 stdout_tee, stderr_tee) 193 194 195def run_bg(command): 196 """Run the command in a subprocess and return the subprocess.""" 197 result = CmdResult(command) 198 def reset_sigpipe(): 199 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 200 sp = subprocess.Popen(command, stdout=subprocess.PIPE, 201 stderr=subprocess.PIPE, preexec_fn=reset_sigpipe, 202 shell=True, executable="/bin/bash") 203 return sp, result 204 205 206def join_bg_job(bg_job, command, timeout=None, ignore_status=False, 207 stdout_tee=None, stderr_tee=None): 208 """Join the subprocess with the current thread. See run description.""" 209 sp, result = bg_job 210 stdout_file = StringIO.StringIO() 211 stderr_file = StringIO.StringIO() 212 (ret, timeouterr) = (0, False) 213 214 try: 215 # We are holding ends to stdin, stdout pipes 216 # hence we need to be sure to close those fds no mater what 217 start_time = time.time() 218 (ret, timeouterr) = _wait_for_command(sp, start_time, 219 timeout, stdout_file, stderr_file, 220 stdout_tee, stderr_tee) 221 result.exit_status = ret 222 result.duration = time.time() - start_time 223 # don't use os.read now, so we get all the rest of the output 224 _process_output(sp.stdout, stdout_file, stdout_tee, final_read=True) 225 _process_output(sp.stderr, stderr_file, stderr_tee, final_read=True) 226 finally: 227 # close our ends of the pipes to the sp no matter what 228 sp.stdout.close() 229 sp.stderr.close() 230 231 result.stdout = stdout_file.getvalue() 232 result.stderr = stderr_file.getvalue() 233 234 if result.exit_status != 0: 235 if timeouterr: 236 raise error.CmdError(command, result, "Command did not " 237 "complete within %d seconds" % timeout) 238 elif not ignore_status: 239 raise error.CmdError(command, result, 240 "Command returned non-zero exit status") 241 242 return result 243 244# this returns a tuple with the return code and a flag to specify if the error 245# is due to the process not terminating within timeout 246def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file, 247 stdout_tee, stderr_tee): 248 if timeout: 249 stop_time = start_time + timeout 250 time_left = stop_time - time.time() 251 else: 252 time_left = None # so that select never times out 253 while not timeout or time_left > 0: 254 # select will return when stdout is ready (including when it is 255 # EOF, that is the process has terminated). 256 ready, _, _ = select.select([subproc.stdout, subproc.stderr], 257 [], [], time_left) 258 # os.read() has to be used instead of 259 # subproc.stdout.read() which will otherwise block 260 if subproc.stdout in ready: 261 _process_output(subproc.stdout, stdout_file, stdout_tee) 262 if subproc.stderr in ready: 263 _process_output(subproc.stderr, stderr_file, stderr_tee) 264 265 exit_status_indication = subproc.poll() 266 267 if exit_status_indication is not None: 268 return (exit_status_indication, False) 269 270 if timeout: 271 time_left = stop_time - time.time() 272 273 # the process has not terminated within timeout, 274 # kill it via an escalating series of signals. 275 if exit_status_indication is None: 276 exit_status_indication = nuke_subprocess(subproc) 277 278 return (exit_status_indication, True) 279 280 281def nuke_subprocess(subproc): 282 # the process has not terminated within timeout, 283 # kill it via an escalating series of signals. 284 signal_queue = [signal.SIGTERM, signal.SIGKILL] 285 for sig in signal_queue: 286 try: 287 os.kill(subproc.pid, sig) 288 # The process may have died before we could kill it. 289 except OSError: 290 pass 291 292 for i in range(5): 293 rc = subproc.poll() 294 if rc != None: 295 return rc 296 time.sleep(1) 297 298 299def nuke_pid(pid): 300 # the process has not terminated within timeout, 301 # kill it via an escalating series of signals. 302 signal_queue = [signal.SIGTERM, signal.SIGKILL] 303 for sig in signal_queue: 304 try: 305 os.kill(pid, sig) 306 307 # The process may have died before we could kill it. 308 except OSError: 309 pass 310 311 try: 312 for i in range(5): 313 status = os.waitpid(pid, os.WNOHANG)[0] 314 if status == pid: 315 return 316 time.sleep(1) 317 318 if status != pid: 319 raise error.AutoservRunError('Could not kill %d' 320 % pid, None) 321 322 # the process died before we join it. 323 except OSError: 324 pass 325 326 327def _process_output(pipe, fbuffer, teefile=None, final_read=False): 328 if final_read: 329 # read in all the data we can from pipe and then stop 330 data = [] 331 while select.select([pipe], [], [], 0)[0]: 332 data.append(os.read(pipe.fileno(), 1024)) 333 if len(data[-1]) == 0: 334 break 335 data = "".join(data) 336 else: 337 # perform a single read 338 data = os.read(pipe.fileno(), 1024) 339 fbuffer.write(data) 340 if teefile: 341 teefile.write(data) 342 teefile.flush() 343 344 345def system(command, timeout=None, ignore_status=False): 346 return run(command, timeout, ignore_status, 347 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status 348 349 350def system_output(command, timeout=None, ignore_status=False, 351 retain_output=False): 352 if retain_output: 353 out = run(command, timeout, ignore_status, 354 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout 355 else: 356 out = run(command, timeout, ignore_status).stdout 357 if out[-1:] == '\n': out = out[:-1] 358 return out 359 360""" 361This function is used when there is a need to run more than one 362job simultaneously starting exactly at the same time. It basically returns 363a modified control file (containing the synchronization code prepended) 364whenever it is ready to run the control file. The synchronization 365is done using barriers to make sure that the jobs start at the same time. 366 367Here is how the synchronization is done to make sure that the tests 368start at exactly the same time on the client. 369sc_bar is a server barrier and s_bar, c_bar are the normal barriers 370 371 Job1 Job2 ...... JobN 372 Server: | sc_bar 373 Server: | s_bar ...... s_bar 374 Server: | at.run() at.run() ...... at.run() 375 ----------|------------------------------------------------------ 376 Client | sc_bar 377 Client | c_bar c_bar ...... c_bar 378 Client | <run test> <run test> ...... <run test> 379 380 381PARAMS: 382 control_file : The control file which to which the above synchronization 383 code would be prepended to 384 host_name : The host name on which the job is going to run 385 host_num (non negative) : A number to identify the machine so that we have 386 different sets of s_bar_ports for each of the machines. 387 instance : The number of the job 388 num_jobs : Total number of jobs that are going to run in parallel with 389 this job starting at the same time 390 port_base : Port number that is used to derive the actual barrier ports. 391 392RETURN VALUE: 393 The modified control file. 394 395""" 396def get_sync_control_file(control, host_name, host_num, 397 instance, num_jobs, port_base=63100): 398 sc_bar_port = port_base 399 c_bar_port = port_base 400 if host_num < 0: 401 print "Please provide a non negative number for the host" 402 return None 403 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 404 # the same for a given machine 405 406 sc_bar_timeout = 180 407 s_bar_timeout = c_bar_timeout = 120 408 409 # The barrier code snippet is prepended into the conrol file 410 # dynamically before at.run() is called finally. 411 control_new = [] 412 413 # jobid is the unique name used to identify the processes 414 # trying to reach the barriers 415 jobid = "%s#%d" % (host_name, instance) 416 417 rendv = [] 418 # rendvstr is a temp holder for the rendezvous list of the processes 419 for n in range(num_jobs): 420 rendv.append("'%s#%d'" % (host_name, n)) 421 rendvstr = ",".join(rendv) 422 423 if instance == 0: 424 # Do the setup and wait at the server barrier 425 # Clean up the tmp and the control dirs for the first instance 426 control_new.append('if os.path.exists(job.tmpdir):') 427 control_new.append("\t system('umount -f %s > /dev/null" 428 "2> /dev/null' % job.tmpdir," 429 "ignore_status=True)") 430 control_new.append("\t system('rm -rf ' + job.tmpdir)") 431 control_new.append( 432 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 433 % (jobid, sc_bar_timeout, sc_bar_port)) 434 control_new.append( 435 'b0.rendevous_servers("PARALLEL_MASTER", "%s")' 436 % jobid) 437 438 elif instance == 1: 439 # Wait at the server barrier to wait for instance=0 440 # process to complete setup 441 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 442 port=sc_bar_port) 443 b0.rendevous_servers("PARALLEL_MASTER", jobid) 444 445 if(num_jobs > 2): 446 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 447 port=s_bar_port) 448 b1.rendevous(rendvstr) 449 450 else: 451 # For the rest of the clients 452 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 453 b2.rendevous(rendvstr) 454 455 # Client side barrier for all the tests to start at the same time 456 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 457 % (jobid, c_bar_timeout, c_bar_port)) 458 control_new.append("b1.rendevous(%s)" % rendvstr) 459 460 # Stick in the rest of the control file 461 control_new.append(control) 462 463 return "\n".join(control_new) 464 465 466def get_arch(run_function=run): 467 """ 468 Get the hardware architecture of the machine. 469 run_function is used to execute the commands. It defaults to 470 utils.run() but a custom method (if provided) should be of the 471 same schema as utils.run. It should return a CmdResult object and 472 throw a CmdError exception. 473 """ 474 arch = run_function('/bin/uname -m').stdout.rstrip() 475 if re.match(r'i\d86$', arch): 476 arch = 'i386' 477 return arch 478 479 480class CmdResult(object): 481 """ 482 Command execution result. 483 484 command: String containing the command line itself 485 exit_status: Integer exit code of the process 486 stdout: String containing stdout of the process 487 stderr: String containing stderr of the process 488 duration: Elapsed wall clock time running the process 489 """ 490 491 492 def __init__(self, command=None, stdout="", stderr="", 493 exit_status=None, duration=0): 494 self.command = command 495 self.exit_status = exit_status 496 self.stdout = stdout 497 self.stderr = stderr 498 self.duration = duration 499 500 501 def __repr__(self): 502 wrapper = textwrap.TextWrapper(width = 78, 503 initial_indent="\n ", 504 subsequent_indent=" ") 505 506 stdout = self.stdout.rstrip() 507 if stdout: 508 stdout = "\nstdout:\n%s" % stdout 509 510 stderr = self.stderr.rstrip() 511 if stderr: 512 stderr = "\nstderr:\n%s" % stderr 513 514 return ("* Command: %s\n" 515 "Exit status: %s\n" 516 "Duration: %s\n" 517 "%s" 518 "%s" 519 % (wrapper.fill(self.command), self.exit_status, 520 self.duration, stdout, stderr)) 521 522 523class run_randomly: 524 def __init__(self, run_sequentially=False): 525 # Run sequentially is for debugging control files 526 self.test_list = [] 527 self.run_sequentially = run_sequentially 528 529 530 def add(self, *args, **dargs): 531 test = (args, dargs) 532 self.test_list.append(test) 533 534 535 def run(self, fn): 536 while self.test_list: 537 test_index = random.randint(0, len(self.test_list)-1) 538 if self.run_sequentially: 539 test_index = 0 540 (args, dargs) = self.test_list.pop(test_index) 541 fn(*args, **dargs) 542