utils.py revision ed91ba902a885505433827b2586117371632a76b
1#!/usr/bin/python 2# 3# Copyright 2008 Google Inc. Released under the GPL v2 4 5import os, pickle, random, re, resource, select, shutil, signal, StringIO 6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse 7import warnings 8from autotest_lib.client.common_lib import error, barrier 9 10def deprecated(func): 11 """This is a decorator which can be used to mark functions as deprecated. 12 It will result in a warning being emmitted when the function is used.""" 13 def new_func(*args, **dargs): 14 warnings.warn("Call to deprecated function %s." % func.__name__, 15 category=DeprecationWarning) 16 return func(*args, **dargs) 17 new_func.__name__ = func.__name__ 18 new_func.__doc__ = func.__doc__ 19 new_func.__dict__.update(func.__dict__) 20 return new_func 21 22 23class BgJob(object): 24 def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True): 25 self.command = command 26 self.stdout_tee = stdout_tee 27 self.stderr_tee = stderr_tee 28 self.result = CmdResult(command) 29 if verbose: 30 print "running: %s" % command 31 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE, 32 stderr=subprocess.PIPE, 33 preexec_fn=self._reset_sigpipe, shell=True, 34 executable="/bin/bash") 35 36 37 def output_prepare(self, stdout_file=None, stderr_file=None): 38 self.stdout_file = stdout_file 39 self.stderr_file = stderr_file 40 41 def process_output(self, stdout=True, final_read=False): 42 """output_prepare must be called prior to calling this""" 43 if stdout: 44 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee 45 else: 46 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee 47 48 if final_read: 49 # read in all the data we can from pipe and then stop 50 data = [] 51 while select.select([pipe], [], [], 0)[0]: 52 data.append(os.read(pipe.fileno(), 1024)) 53 if len(data[-1]) == 0: 54 break 55 data = "".join(data) 56 else: 57 # perform a single read 58 data = os.read(pipe.fileno(), 1024) 59 buf.write(data) 60 if tee: 61 tee.write(data) 62 tee.flush() 63 64 65 def cleanup(self): 66 self.sp.stdout.close() 67 self.sp.stderr.close() 68 self.result.stdout = self.stdout_file.getvalue() 69 self.result.stderr = self.stderr_file.getvalue() 70 71 72 def _reset_sigpipe(self): 73 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 74 75 76def ip_to_long(ip): 77 # !L is a long in network byte order 78 return struct.unpack('!L', socket.inet_aton(ip))[0] 79 80 81def long_to_ip(number): 82 # See above comment. 83 return socket.inet_ntoa(struct.pack('!L', number)) 84 85 86def create_subnet_mask(bits): 87 # ~ does weird things in python...but this does work 88 return (1 << 32) - (1 << 32-bits) 89 90 91def format_ip_with_mask(ip, mask_bits): 92 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits) 93 return "%s/%s" % (long_to_ip(masked_ip), mask_bits) 94 95 96def get_ip_local_port_range(): 97 match = re.match(r'\s*(\d+)\s*(\d+)\s*$', 98 read_one_line('/proc/sys/net/ipv4/ip_local_port_range')) 99 return (int(match.group(1)), int(match.group(2))) 100 101 102def set_ip_local_port_range(lower, upper): 103 write_one_line('/proc/sys/net/ipv4/ip_local_port_range', 104 '%d %d\n' % (lower, upper)) 105 106def read_one_line(filename): 107 return open(filename, 'r').readline().rstrip('\n') 108 109 110def write_one_line(filename, str): 111 open(filename, 'w').write(str.rstrip('\n') + '\n') 112 113 114def read_keyval(path): 115 """ 116 Read a key-value pair format file into a dictionary, and return it. 117 Takes either a filename or directory name as input. If it's a 118 directory name, we assume you want the file to be called keyval. 119 """ 120 if os.path.isdir(path): 121 path = os.path.join(path, 'keyval') 122 keyval = {} 123 for line in open(path): 124 line = re.sub('#.*', '', line).rstrip() 125 if not re.search(r'^[-\w]+=', line): 126 raise ValueError('Invalid format line: %s' % line) 127 key, value = line.split('=', 1) 128 if re.search('^\d+$', value): 129 value = int(value) 130 elif re.search('^(\d+\.)?\d+$', value): 131 value = float(value) 132 keyval[key] = value 133 return keyval 134 135 136def write_keyval(path, dictionary, type_tag=None): 137 """ 138 Write a key-value pair format file out to a file. This uses append 139 mode to open the file, so existing text will not be overwritten or 140 reparsed. 141 142 If type_tag is None, then the key must be composed of alphanumeric 143 characters (or dashes+underscores). However, if type-tag is not 144 null then the keys must also have "{type_tag}" as a suffix. At 145 the moment the only valid values of type_tag are "attr" and "perf". 146 """ 147 if os.path.isdir(path): 148 path = os.path.join(path, 'keyval') 149 keyval = open(path, 'a') 150 151 if type_tag is None: 152 key_regex = re.compile(r'^[-\w]+$') 153 else: 154 if type_tag not in ('attr', 'perf'): 155 raise ValueError('Invalid type tag: %s' % type_tag) 156 escaped_tag = re.escape(type_tag) 157 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag) 158 try: 159 for key, value in dictionary.iteritems(): 160 if not key_regex.search(key): 161 raise ValueError('Invalid key: %s' % key) 162 keyval.write('%s=%s\n' % (key, value)) 163 finally: 164 keyval.close() 165 166 167def is_url(path): 168 """Return true if path looks like a URL""" 169 # for now, just handle http and ftp 170 url_parts = urlparse.urlparse(path) 171 return (url_parts[0] in ('http', 'ftp')) 172 173 174def urlopen(url, data=None, proxies=None, timeout=5): 175 """Wrapper to urllib.urlopen with timeout addition.""" 176 177 # Save old timeout 178 old_timeout = socket.getdefaulttimeout() 179 socket.setdefaulttimeout(timeout) 180 try: 181 return urllib.urlopen(url, data=data, proxies=proxies) 182 finally: 183 socket.setdefaulttimeout(old_timeout) 184 185 186def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300): 187 """Wrapper to urllib.urlretrieve with timeout addition.""" 188 old_timeout = socket.getdefaulttimeout() 189 socket.setdefaulttimeout(timeout) 190 try: 191 return urllib.urlretrieve(url, filename=filename, 192 reporthook=reporthook, data=data) 193 finally: 194 socket.setdefaulttimeout(old_timeout) 195 196 197def get_file(src, dest, permissions=None): 198 """Get a file from src, which can be local or a remote URL""" 199 if (src == dest): 200 return 201 if (is_url(src)): 202 print 'PWD: ' + os.getcwd() 203 print 'Fetching \n\t', src, '\n\t->', dest 204 try: 205 urllib.urlretrieve(src, dest) 206 except IOError, e: 207 raise error.AutotestError('Unable to retrieve %s (to %s)' 208 % (src, dest), e) 209 else: 210 shutil.copyfile(src, dest) 211 if permissions: 212 os.chmod(dest, permissions) 213 return dest 214 215 216def unmap_url(srcdir, src, destdir='.'): 217 """ 218 Receives either a path to a local file or a URL. 219 returns either the path to the local file, or the fetched URL 220 221 unmap_url('/usr/src', 'foo.tar', '/tmp') 222 = '/usr/src/foo.tar' 223 unmap_url('/usr/src', 'http://site/file', '/tmp') 224 = '/tmp/file' 225 (after retrieving it) 226 """ 227 if is_url(src): 228 url_parts = urlparse.urlparse(src) 229 filename = os.path.basename(url_parts[2]) 230 dest = os.path.join(destdir, filename) 231 return get_file(src, dest) 232 else: 233 return os.path.join(srcdir, src) 234 235 236def update_version(srcdir, preserve_srcdir, new_version, install, 237 *args, **dargs): 238 """ 239 Make sure srcdir is version new_version 240 241 If not, delete it and install() the new version. 242 243 In the preserve_srcdir case, we just check it's up to date, 244 and if not, we rerun install, without removing srcdir 245 """ 246 versionfile = os.path.join(srcdir, '.version') 247 install_needed = True 248 249 if os.path.exists(versionfile): 250 old_version = pickle.load(open(versionfile)) 251 if old_version == new_version: 252 install_needed = False 253 254 if install_needed: 255 if not preserve_srcdir and os.path.exists(srcdir): 256 shutil.rmtree(srcdir) 257 install(*args, **dargs) 258 if os.path.exists(srcdir): 259 pickle.dump(new_version, open(versionfile, 'w')) 260 261 262def run(command, timeout=None, ignore_status=False, 263 stdout_tee=None, stderr_tee=None, verbose=True): 264 """ 265 Run a command on the host. 266 267 Args: 268 command: the command line string 269 timeout: time limit in seconds before attempting to 270 kill the running process. The run() function 271 will take a few seconds longer than 'timeout' 272 to complete if it has to kill the process. 273 ignore_status: do not raise an exception, no matter what 274 the exit code of the command is. 275 stdout_tee: optional file-like object to which stdout data 276 will be written as it is generated (data will still 277 be stored in result.stdout) 278 stderr_tee: likewise for stderr 279 280 Returns: 281 a CmdResult object 282 283 Raises: 284 CmdError: the exit code of the command 285 execution was not 0 286 """ 287 bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),), 288 timeout)[0] 289 if not ignore_status and bg_job.result.exit_status: 290 raise error.CmdError(command, bg_job.result, 291 "Command returned non-zero exit status") 292 293 return bg_job.result 294 295def run_parallel(commands, timeout=None, ignore_status=False, 296 stdout_tee=None, stderr_tee=None): 297 """Beahves the same as run with the following exceptions: 298 299 - commands is a list of commands to run in parallel. 300 - ignore_status toggles whether or not an exception should be raised 301 on any error. 302 303 returns a list of CmdResult objects 304 """ 305 bg_jobs = [] 306 for command in commands: 307 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee)) 308 309 # Updates objects in bg_jobs list with their process information 310 join_bg_jobs(bg_jobs, timeout) 311 312 for bg_job in bg_jobs: 313 if not ignore_status and bg_job.result.exit_status: 314 raise error.CmdError(command, bg_job.result, 315 "Command returned non-zero exit status") 316 317 return [bg_job.result for bg_job in bg_jobs] 318 319 320@deprecated 321def run_bg(command): 322 """Function deprecated. Please use BgJob class instead.""" 323 bg_job = BgJob(command) 324 return bg_job.sp, bg_job.result 325 326 327def join_bg_jobs(bg_jobs, timeout=None): 328 """Joins the bg_jobs with the current thread. 329 330 Returns the same list of bg_jobs objects that was passed in. 331 """ 332 ret, timeouterr = 0, False 333 for bg_job in bg_jobs: 334 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO()) 335 336 try: 337 # We are holding ends to stdin, stdout pipes 338 # hence we need to be sure to close those fds no mater what 339 start_time = time.time() 340 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout) 341 342 for bg_job in bg_jobs: 343 # Process stdout and stderr 344 bg_job.process_output(stdout=True,final_read=True) 345 bg_job.process_output(stdout=False,final_read=True) 346 finally: 347 # close our ends of the pipes to the sp no matter what 348 for bg_job in bg_jobs: 349 bg_job.cleanup() 350 351 if timeout_error: 352 # TODO: This needs to be fixed to better represent what happens when 353 # running in parallel. However this is backwards compatable, so it will 354 # do for the time being. 355 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result, 356 "Command(s) did not complete within %d seconds" 357 % timeout) 358 359 360 return bg_jobs 361 362 363def _wait_for_commands(bg_jobs, start_time, timeout): 364 # This returns True if it must return due to a timeout, otherwise False. 365 366 # To check for processes which terminate without producing any output 367 # a 1 second timeout is used in select. 368 SELECT_TIMEOUT = 1 369 370 select_list = [] 371 reverse_dict = {} 372 for bg_job in bg_jobs: 373 select_list.append(bg_job.sp.stdout) 374 select_list.append(bg_job.sp.stderr) 375 reverse_dict[bg_job.sp.stdout] = (bg_job,True) 376 reverse_dict[bg_job.sp.stderr] = (bg_job,False) 377 378 if timeout: 379 stop_time = start_time + timeout 380 time_left = stop_time - time.time() 381 else: 382 time_left = None # so that select never times out 383 while not timeout or time_left > 0: 384 # select will return when stdout is ready (including when it is 385 # EOF, that is the process has terminated). 386 ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT) 387 388 # os.read() has to be used instead of 389 # subproc.stdout.read() which will otherwise block 390 for fileno in ready: 391 bg_job,stdout = reverse_dict[fileno] 392 bg_job.process_output(stdout) 393 394 remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None] 395 if len(remaining_jobs) == 0: 396 return False 397 for bg_job in remaining_jobs: 398 bg_job.result.exit_status = bg_job.sp.poll() 399 400 if timeout: 401 time_left = stop_time - time.time() 402 403 # Kill all processes which did not complete prior to timeout 404 for bg_job in [x for x in bg_jobs if x.result.exit_status is None]: 405 nuke_subprocess(bg_job.sp) 406 407 return True 408 409 410def nuke_subprocess(subproc): 411 # check if the subprocess is still alive, first 412 if subproc.poll() is not None: 413 return subproc.poll() 414 415 # the process has not terminated within timeout, 416 # kill it via an escalating series of signals. 417 signal_queue = [signal.SIGTERM, signal.SIGKILL] 418 for sig in signal_queue: 419 try: 420 os.kill(subproc.pid, sig) 421 # The process may have died before we could kill it. 422 except OSError: 423 pass 424 425 for i in range(5): 426 rc = subproc.poll() 427 if rc != None: 428 return rc 429 time.sleep(1) 430 431 432def nuke_pid(pid): 433 # the process has not terminated within timeout, 434 # kill it via an escalating series of signals. 435 signal_queue = [signal.SIGTERM, signal.SIGKILL] 436 for sig in signal_queue: 437 try: 438 os.kill(pid, sig) 439 440 # The process may have died before we could kill it. 441 except OSError: 442 pass 443 444 try: 445 for i in range(5): 446 status = os.waitpid(pid, os.WNOHANG)[0] 447 if status == pid: 448 return 449 time.sleep(1) 450 451 if status != pid: 452 raise error.AutoservRunError('Could not kill %d' 453 % pid, None) 454 455 # the process died before we join it. 456 except OSError: 457 pass 458 459 460 461def system(command, timeout=None, ignore_status=False): 462 """This function returns the exit status of command.""" 463 return run(command, timeout, ignore_status, 464 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status 465 466 467def system_parallel(commands, timeout=None, ignore_status=False): 468 """This function returns a list of exit statuses for the respective 469 list of commands.""" 470 return [bg_jobs.exit_status for bg_jobs in 471 run_parallel(commands, timeout, ignore_status, 472 stdout_tee=sys.stdout, stderr_tee=sys.stderr)] 473 474 475def system_output(command, timeout=None, ignore_status=False, 476 retain_output=False): 477 if retain_output: 478 out = run(command, timeout, ignore_status, 479 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout 480 else: 481 out = run(command, timeout, ignore_status).stdout 482 if out[-1:] == '\n': out = out[:-1] 483 return out 484 485 486def system_output_parallel(commands, timeout=None, ignore_status=False, 487 retain_output=False): 488 if retain_output: 489 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout, 490 ignore_status, 491 stdout_tee=sys.stdout, 492 stderr_tee=sys.stderr)] 493 else: 494 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout, 495 ignore_status)] 496 for x in out: 497 if out[-1:] == '\n': out = out[:-1] 498 return out 499 500 501def get_cpu_percentage(function, *args, **dargs): 502 """Returns a tuple containing the CPU% and return value from function call. 503 504 This function calculates the usage time by taking the difference of 505 the user and system times both before and after the function call. 506 """ 507 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN) 508 self_pre = resource.getrusage(resource.RUSAGE_SELF) 509 start = time.time() 510 to_return = function(*args, **dargs) 511 elapsed = time.time() - start 512 self_post = resource.getrusage(resource.RUSAGE_SELF) 513 child_post = resource.getrusage(resource.RUSAGE_CHILDREN) 514 515 # Calculate CPU Percentage 516 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]] 517 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]] 518 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed 519 520 return cpu_percent, to_return 521 522 523""" 524This function is used when there is a need to run more than one 525job simultaneously starting exactly at the same time. It basically returns 526a modified control file (containing the synchronization code prepended) 527whenever it is ready to run the control file. The synchronization 528is done using barriers to make sure that the jobs start at the same time. 529 530Here is how the synchronization is done to make sure that the tests 531start at exactly the same time on the client. 532sc_bar is a server barrier and s_bar, c_bar are the normal barriers 533 534 Job1 Job2 ...... JobN 535 Server: | sc_bar 536 Server: | s_bar ...... s_bar 537 Server: | at.run() at.run() ...... at.run() 538 ----------|------------------------------------------------------ 539 Client | sc_bar 540 Client | c_bar c_bar ...... c_bar 541 Client | <run test> <run test> ...... <run test> 542 543 544PARAMS: 545 control_file : The control file which to which the above synchronization 546 code would be prepended to 547 host_name : The host name on which the job is going to run 548 host_num (non negative) : A number to identify the machine so that we have 549 different sets of s_bar_ports for each of the machines. 550 instance : The number of the job 551 num_jobs : Total number of jobs that are going to run in parallel with 552 this job starting at the same time 553 port_base : Port number that is used to derive the actual barrier ports. 554 555RETURN VALUE: 556 The modified control file. 557 558""" 559def get_sync_control_file(control, host_name, host_num, 560 instance, num_jobs, port_base=63100): 561 sc_bar_port = port_base 562 c_bar_port = port_base 563 if host_num < 0: 564 print "Please provide a non negative number for the host" 565 return None 566 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 567 # the same for a given machine 568 569 sc_bar_timeout = 180 570 s_bar_timeout = c_bar_timeout = 120 571 572 # The barrier code snippet is prepended into the conrol file 573 # dynamically before at.run() is called finally. 574 control_new = [] 575 576 # jobid is the unique name used to identify the processes 577 # trying to reach the barriers 578 jobid = "%s#%d" % (host_name, instance) 579 580 rendv = [] 581 # rendvstr is a temp holder for the rendezvous list of the processes 582 for n in range(num_jobs): 583 rendv.append("'%s#%d'" % (host_name, n)) 584 rendvstr = ",".join(rendv) 585 586 if instance == 0: 587 # Do the setup and wait at the server barrier 588 # Clean up the tmp and the control dirs for the first instance 589 control_new.append('if os.path.exists(job.tmpdir):') 590 control_new.append("\t system('umount -f %s > /dev/null" 591 "2> /dev/null' % job.tmpdir," 592 "ignore_status=True)") 593 control_new.append("\t system('rm -rf ' + job.tmpdir)") 594 control_new.append( 595 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 596 % (jobid, sc_bar_timeout, sc_bar_port)) 597 control_new.append( 598 'b0.rendevous_servers("PARALLEL_MASTER", "%s")' 599 % jobid) 600 601 elif instance == 1: 602 # Wait at the server barrier to wait for instance=0 603 # process to complete setup 604 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 605 port=sc_bar_port) 606 b0.rendevous_servers("PARALLEL_MASTER", jobid) 607 608 if(num_jobs > 2): 609 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 610 port=s_bar_port) 611 b1.rendevous(rendvstr) 612 613 else: 614 # For the rest of the clients 615 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 616 b2.rendevous(rendvstr) 617 618 # Client side barrier for all the tests to start at the same time 619 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 620 % (jobid, c_bar_timeout, c_bar_port)) 621 control_new.append("b1.rendevous(%s)" % rendvstr) 622 623 # Stick in the rest of the control file 624 control_new.append(control) 625 626 return "\n".join(control_new) 627 628 629def get_arch(run_function=run): 630 """ 631 Get the hardware architecture of the machine. 632 run_function is used to execute the commands. It defaults to 633 utils.run() but a custom method (if provided) should be of the 634 same schema as utils.run. It should return a CmdResult object and 635 throw a CmdError exception. 636 """ 637 arch = run_function('/bin/uname -m').stdout.rstrip() 638 if re.match(r'i\d86$', arch): 639 arch = 'i386' 640 return arch 641 642 643class CmdResult(object): 644 """ 645 Command execution result. 646 647 command: String containing the command line itself 648 exit_status: Integer exit code of the process 649 stdout: String containing stdout of the process 650 stderr: String containing stderr of the process 651 duration: Elapsed wall clock time running the process 652 """ 653 654 655 def __init__(self, command=None, stdout="", stderr="", 656 exit_status=None, duration=0): 657 self.command = command 658 self.exit_status = exit_status 659 self.stdout = stdout 660 self.stderr = stderr 661 self.duration = duration 662 663 664 def __repr__(self): 665 wrapper = textwrap.TextWrapper(width = 78, 666 initial_indent="\n ", 667 subsequent_indent=" ") 668 669 stdout = self.stdout.rstrip() 670 if stdout: 671 stdout = "\nstdout:\n%s" % stdout 672 673 stderr = self.stderr.rstrip() 674 if stderr: 675 stderr = "\nstderr:\n%s" % stderr 676 677 return ("* Command: %s\n" 678 "Exit status: %s\n" 679 "Duration: %s\n" 680 "%s" 681 "%s" 682 % (wrapper.fill(self.command), self.exit_status, 683 self.duration, stdout, stderr)) 684 685 686class run_randomly: 687 def __init__(self, run_sequentially=False): 688 # Run sequentially is for debugging control files 689 self.test_list = [] 690 self.run_sequentially = run_sequentially 691 692 693 def add(self, *args, **dargs): 694 test = (args, dargs) 695 self.test_list.append(test) 696 697 698 def run(self, fn): 699 while self.test_list: 700 test_index = random.randint(0, len(self.test_list)-1) 701 if self.run_sequentially: 702 test_index = 0 703 (args, dargs) = self.test_list.pop(test_index) 704 fn(*args, **dargs) 705