utils.py revision e80d47159f8e43d6e0d736554e207833ec116dfa
1#!/usr/bin/python 2# 3# Copyright 2008 Google Inc. Released under the GPL v2 4 5import os, pickle, random, re, resource, select, shutil, signal, StringIO 6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse 7import warnings 8from autotest_lib.client.common_lib import error, barrier 9 10def deprecated(func): 11 """This is a decorator which can be used to mark functions as deprecated. 12 It will result in a warning being emmitted when the function is used.""" 13 def new_func(*args, **dargs): 14 warnings.warn("Call to deprecated function %s." % func.__name__, 15 category=DeprecationWarning) 16 return func(*args, **dargs) 17 new_func.__name__ = func.__name__ 18 new_func.__doc__ = func.__doc__ 19 new_func.__dict__.update(func.__dict__) 20 return new_func 21 22 23class BgJob(object): 24 def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True): 25 self.command = command 26 self.stdout_tee = stdout_tee 27 self.stderr_tee = stderr_tee 28 self.result = CmdResult(command) 29 if verbose: 30 print "running: %s" % command 31 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE, 32 stderr=subprocess.PIPE, 33 preexec_fn=self._reset_sigpipe, shell=True, 34 executable="/bin/bash") 35 36 37 def output_prepare(self, stdout_file=None, stderr_file=None): 38 self.stdout_file = stdout_file 39 self.stderr_file = stderr_file 40 41 def process_output(self, stdout=True, final_read=False): 42 """output_prepare must be called prior to calling this""" 43 if stdout: 44 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee 45 else: 46 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee 47 48 if final_read: 49 # read in all the data we can from pipe and then stop 50 data = [] 51 while select.select([pipe], [], [], 0)[0]: 52 data.append(os.read(pipe.fileno(), 1024)) 53 if len(data[-1]) == 0: 54 break 55 data = "".join(data) 56 else: 57 # perform a single read 58 data = os.read(pipe.fileno(), 1024) 59 buf.write(data) 60 if tee: 61 tee.write(data) 62 tee.flush() 63 64 65 def cleanup(self): 66 self.sp.stdout.close() 67 self.sp.stderr.close() 68 self.result.stdout = self.stdout_file.getvalue() 69 self.result.stderr = self.stderr_file.getvalue() 70 71 72 def _reset_sigpipe(self): 73 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 74 75 76def ip_to_long(ip): 77 # !L is a long in network byte order 78 return struct.unpack('!L', socket.inet_aton(ip))[0] 79 80 81def long_to_ip(number): 82 # See above comment. 83 return socket.inet_ntoa(struct.pack('!L', number)) 84 85 86def create_subnet_mask(bits): 87 # ~ does weird things in python...but this does work 88 return (1 << 32) - (1 << 32-bits) 89 90 91def format_ip_with_mask(ip, mask_bits): 92 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits) 93 return "%s/%s" % (long_to_ip(masked_ip), mask_bits) 94 95 96def normalize_hostname(alias): 97 ip = socket.gethostbyname(alias) 98 return socket.gethostbyaddr(ip)[0] 99 100 101def get_ip_local_port_range(): 102 match = re.match(r'\s*(\d+)\s*(\d+)\s*$', 103 read_one_line('/proc/sys/net/ipv4/ip_local_port_range')) 104 return (int(match.group(1)), int(match.group(2))) 105 106 107def set_ip_local_port_range(lower, upper): 108 write_one_line('/proc/sys/net/ipv4/ip_local_port_range', 109 '%d %d\n' % (lower, upper)) 110 111 112def read_one_line(filename): 113 return open(filename, 'r').readline().rstrip('\n') 114 115 116def write_one_line(filename, str): 117 open(filename, 'w').write(str.rstrip('\n') + '\n') 118 119 120def read_keyval(path): 121 """ 122 Read a key-value pair format file into a dictionary, and return it. 123 Takes either a filename or directory name as input. If it's a 124 directory name, we assume you want the file to be called keyval. 125 """ 126 if os.path.isdir(path): 127 path = os.path.join(path, 'keyval') 128 keyval = {} 129 for line in open(path): 130 line = re.sub('#.*', '', line).rstrip() 131 if not re.search(r'^[-\w]+=', line): 132 raise ValueError('Invalid format line: %s' % line) 133 key, value = line.split('=', 1) 134 if re.search('^\d+$', value): 135 value = int(value) 136 elif re.search('^(\d+\.)?\d+$', value): 137 value = float(value) 138 keyval[key] = value 139 return keyval 140 141 142def write_keyval(path, dictionary, type_tag=None): 143 """ 144 Write a key-value pair format file out to a file. This uses append 145 mode to open the file, so existing text will not be overwritten or 146 reparsed. 147 148 If type_tag is None, then the key must be composed of alphanumeric 149 characters (or dashes+underscores). However, if type-tag is not 150 null then the keys must also have "{type_tag}" as a suffix. At 151 the moment the only valid values of type_tag are "attr" and "perf". 152 """ 153 if os.path.isdir(path): 154 path = os.path.join(path, 'keyval') 155 keyval = open(path, 'a') 156 157 if type_tag is None: 158 key_regex = re.compile(r'^[-\w]+$') 159 else: 160 if type_tag not in ('attr', 'perf'): 161 raise ValueError('Invalid type tag: %s' % type_tag) 162 escaped_tag = re.escape(type_tag) 163 key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag) 164 try: 165 for key, value in dictionary.iteritems(): 166 if not key_regex.search(key): 167 raise ValueError('Invalid key: %s' % key) 168 keyval.write('%s=%s\n' % (key, value)) 169 finally: 170 keyval.close() 171 172 173def is_url(path): 174 """Return true if path looks like a URL""" 175 # for now, just handle http and ftp 176 url_parts = urlparse.urlparse(path) 177 return (url_parts[0] in ('http', 'ftp')) 178 179 180def urlopen(url, data=None, proxies=None, timeout=5): 181 """Wrapper to urllib.urlopen with timeout addition.""" 182 183 # Save old timeout 184 old_timeout = socket.getdefaulttimeout() 185 socket.setdefaulttimeout(timeout) 186 try: 187 return urllib.urlopen(url, data=data, proxies=proxies) 188 finally: 189 socket.setdefaulttimeout(old_timeout) 190 191 192def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300): 193 """Wrapper to urllib.urlretrieve with timeout addition.""" 194 old_timeout = socket.getdefaulttimeout() 195 socket.setdefaulttimeout(timeout) 196 try: 197 return urllib.urlretrieve(url, filename=filename, 198 reporthook=reporthook, data=data) 199 finally: 200 socket.setdefaulttimeout(old_timeout) 201 202 203def get_file(src, dest, permissions=None): 204 """Get a file from src, which can be local or a remote URL""" 205 if (src == dest): 206 return 207 if (is_url(src)): 208 print 'PWD: ' + os.getcwd() 209 print 'Fetching \n\t', src, '\n\t->', dest 210 try: 211 urllib.urlretrieve(src, dest) 212 except IOError, e: 213 raise error.AutotestError('Unable to retrieve %s (to %s)' 214 % (src, dest), e) 215 else: 216 shutil.copyfile(src, dest) 217 if permissions: 218 os.chmod(dest, permissions) 219 return dest 220 221 222def unmap_url(srcdir, src, destdir='.'): 223 """ 224 Receives either a path to a local file or a URL. 225 returns either the path to the local file, or the fetched URL 226 227 unmap_url('/usr/src', 'foo.tar', '/tmp') 228 = '/usr/src/foo.tar' 229 unmap_url('/usr/src', 'http://site/file', '/tmp') 230 = '/tmp/file' 231 (after retrieving it) 232 """ 233 if is_url(src): 234 url_parts = urlparse.urlparse(src) 235 filename = os.path.basename(url_parts[2]) 236 dest = os.path.join(destdir, filename) 237 return get_file(src, dest) 238 else: 239 return os.path.join(srcdir, src) 240 241 242def update_version(srcdir, preserve_srcdir, new_version, install, 243 *args, **dargs): 244 """ 245 Make sure srcdir is version new_version 246 247 If not, delete it and install() the new version. 248 249 In the preserve_srcdir case, we just check it's up to date, 250 and if not, we rerun install, without removing srcdir 251 """ 252 versionfile = os.path.join(srcdir, '.version') 253 install_needed = True 254 255 if os.path.exists(versionfile): 256 old_version = pickle.load(open(versionfile)) 257 if old_version == new_version: 258 install_needed = False 259 260 if install_needed: 261 if not preserve_srcdir and os.path.exists(srcdir): 262 shutil.rmtree(srcdir) 263 install(*args, **dargs) 264 if os.path.exists(srcdir): 265 pickle.dump(new_version, open(versionfile, 'w')) 266 267 268def run(command, timeout=None, ignore_status=False, 269 stdout_tee=None, stderr_tee=None, verbose=True): 270 """ 271 Run a command on the host. 272 273 Args: 274 command: the command line string 275 timeout: time limit in seconds before attempting to 276 kill the running process. The run() function 277 will take a few seconds longer than 'timeout' 278 to complete if it has to kill the process. 279 ignore_status: do not raise an exception, no matter what 280 the exit code of the command is. 281 stdout_tee: optional file-like object to which stdout data 282 will be written as it is generated (data will still 283 be stored in result.stdout) 284 stderr_tee: likewise for stderr 285 286 Returns: 287 a CmdResult object 288 289 Raises: 290 CmdError: the exit code of the command 291 execution was not 0 292 """ 293 bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),), 294 timeout)[0] 295 if not ignore_status and bg_job.result.exit_status: 296 raise error.CmdError(command, bg_job.result, 297 "Command returned non-zero exit status") 298 299 return bg_job.result 300 301def run_parallel(commands, timeout=None, ignore_status=False, 302 stdout_tee=None, stderr_tee=None): 303 """Beahves the same as run with the following exceptions: 304 305 - commands is a list of commands to run in parallel. 306 - ignore_status toggles whether or not an exception should be raised 307 on any error. 308 309 returns a list of CmdResult objects 310 """ 311 bg_jobs = [] 312 for command in commands: 313 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee)) 314 315 # Updates objects in bg_jobs list with their process information 316 join_bg_jobs(bg_jobs, timeout) 317 318 for bg_job in bg_jobs: 319 if not ignore_status and bg_job.result.exit_status: 320 raise error.CmdError(command, bg_job.result, 321 "Command returned non-zero exit status") 322 323 return [bg_job.result for bg_job in bg_jobs] 324 325 326@deprecated 327def run_bg(command): 328 """Function deprecated. Please use BgJob class instead.""" 329 bg_job = BgJob(command) 330 return bg_job.sp, bg_job.result 331 332 333def join_bg_jobs(bg_jobs, timeout=None): 334 """Joins the bg_jobs with the current thread. 335 336 Returns the same list of bg_jobs objects that was passed in. 337 """ 338 ret, timeouterr = 0, False 339 for bg_job in bg_jobs: 340 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO()) 341 342 try: 343 # We are holding ends to stdin, stdout pipes 344 # hence we need to be sure to close those fds no mater what 345 start_time = time.time() 346 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout) 347 348 for bg_job in bg_jobs: 349 # Process stdout and stderr 350 bg_job.process_output(stdout=True,final_read=True) 351 bg_job.process_output(stdout=False,final_read=True) 352 finally: 353 # close our ends of the pipes to the sp no matter what 354 for bg_job in bg_jobs: 355 bg_job.cleanup() 356 357 if timeout_error: 358 # TODO: This needs to be fixed to better represent what happens when 359 # running in parallel. However this is backwards compatable, so it will 360 # do for the time being. 361 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result, 362 "Command(s) did not complete within %d seconds" 363 % timeout) 364 365 366 return bg_jobs 367 368 369def _wait_for_commands(bg_jobs, start_time, timeout): 370 # This returns True if it must return due to a timeout, otherwise False. 371 372 # To check for processes which terminate without producing any output 373 # a 1 second timeout is used in select. 374 SELECT_TIMEOUT = 1 375 376 select_list = [] 377 reverse_dict = {} 378 for bg_job in bg_jobs: 379 select_list.append(bg_job.sp.stdout) 380 select_list.append(bg_job.sp.stderr) 381 reverse_dict[bg_job.sp.stdout] = (bg_job,True) 382 reverse_dict[bg_job.sp.stderr] = (bg_job,False) 383 384 if timeout: 385 stop_time = start_time + timeout 386 time_left = stop_time - time.time() 387 else: 388 time_left = None # so that select never times out 389 while not timeout or time_left > 0: 390 # select will return when stdout is ready (including when it is 391 # EOF, that is the process has terminated). 392 ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT) 393 394 # os.read() has to be used instead of 395 # subproc.stdout.read() which will otherwise block 396 for fileno in ready: 397 bg_job,stdout = reverse_dict[fileno] 398 bg_job.process_output(stdout) 399 400 remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None] 401 if len(remaining_jobs) == 0: 402 return False 403 for bg_job in remaining_jobs: 404 bg_job.result.exit_status = bg_job.sp.poll() 405 406 if timeout: 407 time_left = stop_time - time.time() 408 409 # Kill all processes which did not complete prior to timeout 410 for bg_job in [x for x in bg_jobs if x.result.exit_status is None]: 411 nuke_subprocess(bg_job.sp) 412 bg_job.result.exit_status = bg_job.sp.poll() 413 414 return True 415 416 417def nuke_subprocess(subproc): 418 # check if the subprocess is still alive, first 419 if subproc.poll() is not None: 420 return subproc.poll() 421 422 # the process has not terminated within timeout, 423 # kill it via an escalating series of signals. 424 signal_queue = [signal.SIGTERM, signal.SIGKILL] 425 for sig in signal_queue: 426 try: 427 os.kill(subproc.pid, sig) 428 # The process may have died before we could kill it. 429 except OSError: 430 pass 431 432 for i in range(5): 433 rc = subproc.poll() 434 if rc != None: 435 return rc 436 time.sleep(1) 437 438 439def nuke_pid(pid): 440 # the process has not terminated within timeout, 441 # kill it via an escalating series of signals. 442 signal_queue = [signal.SIGTERM, signal.SIGKILL] 443 for sig in signal_queue: 444 try: 445 os.kill(pid, sig) 446 447 # The process may have died before we could kill it. 448 except OSError: 449 pass 450 451 try: 452 for i in range(5): 453 status = os.waitpid(pid, os.WNOHANG)[0] 454 if status == pid: 455 return 456 time.sleep(1) 457 458 if status != pid: 459 raise error.AutoservRunError('Could not kill %d' 460 % pid, None) 461 462 # the process died before we join it. 463 except OSError: 464 pass 465 466 467 468def system(command, timeout=None, ignore_status=False): 469 """This function returns the exit status of command.""" 470 return run(command, timeout, ignore_status, 471 stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status 472 473 474def system_parallel(commands, timeout=None, ignore_status=False): 475 """This function returns a list of exit statuses for the respective 476 list of commands.""" 477 return [bg_jobs.exit_status for bg_jobs in 478 run_parallel(commands, timeout, ignore_status, 479 stdout_tee=sys.stdout, stderr_tee=sys.stderr)] 480 481 482def system_output(command, timeout=None, ignore_status=False, 483 retain_output=False): 484 if retain_output: 485 out = run(command, timeout, ignore_status, 486 stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout 487 else: 488 out = run(command, timeout, ignore_status).stdout 489 if out[-1:] == '\n': out = out[:-1] 490 return out 491 492 493def system_output_parallel(commands, timeout=None, ignore_status=False, 494 retain_output=False): 495 if retain_output: 496 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout, 497 ignore_status, 498 stdout_tee=sys.stdout, 499 stderr_tee=sys.stderr)] 500 else: 501 out = [bg_job.stdout for bg_job in run_parallel(commands, timeout, 502 ignore_status)] 503 for x in out: 504 if out[-1:] == '\n': out = out[:-1] 505 return out 506 507 508def get_cpu_percentage(function, *args, **dargs): 509 """Returns a tuple containing the CPU% and return value from function call. 510 511 This function calculates the usage time by taking the difference of 512 the user and system times both before and after the function call. 513 """ 514 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN) 515 self_pre = resource.getrusage(resource.RUSAGE_SELF) 516 start = time.time() 517 to_return = function(*args, **dargs) 518 elapsed = time.time() - start 519 self_post = resource.getrusage(resource.RUSAGE_SELF) 520 child_post = resource.getrusage(resource.RUSAGE_CHILDREN) 521 522 # Calculate CPU Percentage 523 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]] 524 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]] 525 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed 526 527 return cpu_percent, to_return 528 529 530""" 531This function is used when there is a need to run more than one 532job simultaneously starting exactly at the same time. It basically returns 533a modified control file (containing the synchronization code prepended) 534whenever it is ready to run the control file. The synchronization 535is done using barriers to make sure that the jobs start at the same time. 536 537Here is how the synchronization is done to make sure that the tests 538start at exactly the same time on the client. 539sc_bar is a server barrier and s_bar, c_bar are the normal barriers 540 541 Job1 Job2 ...... JobN 542 Server: | sc_bar 543 Server: | s_bar ...... s_bar 544 Server: | at.run() at.run() ...... at.run() 545 ----------|------------------------------------------------------ 546 Client | sc_bar 547 Client | c_bar c_bar ...... c_bar 548 Client | <run test> <run test> ...... <run test> 549 550 551PARAMS: 552 control_file : The control file which to which the above synchronization 553 code would be prepended to 554 host_name : The host name on which the job is going to run 555 host_num (non negative) : A number to identify the machine so that we have 556 different sets of s_bar_ports for each of the machines. 557 instance : The number of the job 558 num_jobs : Total number of jobs that are going to run in parallel with 559 this job starting at the same time 560 port_base : Port number that is used to derive the actual barrier ports. 561 562RETURN VALUE: 563 The modified control file. 564 565""" 566def get_sync_control_file(control, host_name, host_num, 567 instance, num_jobs, port_base=63100): 568 sc_bar_port = port_base 569 c_bar_port = port_base 570 if host_num < 0: 571 print "Please provide a non negative number for the host" 572 return None 573 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 574 # the same for a given machine 575 576 sc_bar_timeout = 180 577 s_bar_timeout = c_bar_timeout = 120 578 579 # The barrier code snippet is prepended into the conrol file 580 # dynamically before at.run() is called finally. 581 control_new = [] 582 583 # jobid is the unique name used to identify the processes 584 # trying to reach the barriers 585 jobid = "%s#%d" % (host_name, instance) 586 587 rendv = [] 588 # rendvstr is a temp holder for the rendezvous list of the processes 589 for n in range(num_jobs): 590 rendv.append("'%s#%d'" % (host_name, n)) 591 rendvstr = ",".join(rendv) 592 593 if instance == 0: 594 # Do the setup and wait at the server barrier 595 # Clean up the tmp and the control dirs for the first instance 596 control_new.append('if os.path.exists(job.tmpdir):') 597 control_new.append("\t system('umount -f %s > /dev/null" 598 "2> /dev/null' % job.tmpdir," 599 "ignore_status=True)") 600 control_new.append("\t system('rm -rf ' + job.tmpdir)") 601 control_new.append( 602 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 603 % (jobid, sc_bar_timeout, sc_bar_port)) 604 control_new.append( 605 'b0.rendevous_servers("PARALLEL_MASTER", "%s")' 606 % jobid) 607 608 elif instance == 1: 609 # Wait at the server barrier to wait for instance=0 610 # process to complete setup 611 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 612 port=sc_bar_port) 613 b0.rendevous_servers("PARALLEL_MASTER", jobid) 614 615 if(num_jobs > 2): 616 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 617 port=s_bar_port) 618 b1.rendevous(rendvstr) 619 620 else: 621 # For the rest of the clients 622 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 623 b2.rendevous(rendvstr) 624 625 # Client side barrier for all the tests to start at the same time 626 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 627 % (jobid, c_bar_timeout, c_bar_port)) 628 control_new.append("b1.rendevous(%s)" % rendvstr) 629 630 # Stick in the rest of the control file 631 control_new.append(control) 632 633 return "\n".join(control_new) 634 635 636def get_arch(run_function=run): 637 """ 638 Get the hardware architecture of the machine. 639 run_function is used to execute the commands. It defaults to 640 utils.run() but a custom method (if provided) should be of the 641 same schema as utils.run. It should return a CmdResult object and 642 throw a CmdError exception. 643 """ 644 arch = run_function('/bin/uname -m').stdout.rstrip() 645 if re.match(r'i\d86$', arch): 646 arch = 'i386' 647 return arch 648 649 650class CmdResult(object): 651 """ 652 Command execution result. 653 654 command: String containing the command line itself 655 exit_status: Integer exit code of the process 656 stdout: String containing stdout of the process 657 stderr: String containing stderr of the process 658 duration: Elapsed wall clock time running the process 659 """ 660 661 662 def __init__(self, command=None, stdout="", stderr="", 663 exit_status=None, duration=0): 664 self.command = command 665 self.exit_status = exit_status 666 self.stdout = stdout 667 self.stderr = stderr 668 self.duration = duration 669 670 671 def __repr__(self): 672 wrapper = textwrap.TextWrapper(width = 78, 673 initial_indent="\n ", 674 subsequent_indent=" ") 675 676 stdout = self.stdout.rstrip() 677 if stdout: 678 stdout = "\nstdout:\n%s" % stdout 679 680 stderr = self.stderr.rstrip() 681 if stderr: 682 stderr = "\nstderr:\n%s" % stderr 683 684 return ("* Command: %s\n" 685 "Exit status: %s\n" 686 "Duration: %s\n" 687 "%s" 688 "%s" 689 % (wrapper.fill(self.command), self.exit_status, 690 self.duration, stdout, stderr)) 691 692 693class run_randomly: 694 def __init__(self, run_sequentially=False): 695 # Run sequentially is for debugging control files 696 self.test_list = [] 697 self.run_sequentially = run_sequentially 698 699 700 def add(self, *args, **dargs): 701 test = (args, dargs) 702 self.test_list.append(test) 703 704 705 def run(self, fn): 706 while self.test_list: 707 test_index = random.randint(0, len(self.test_list)-1) 708 if self.run_sequentially: 709 test_index = 0 710 (args, dargs) = self.test_list.pop(test_index) 711 fn(*args, **dargs) 712