utils.py revision f989e58f218b3f2597d6320f65a5c37f4a9a6ebd
1# 2# Copyright 2008 Google Inc. Released under the GPL v2 3 4import os, pickle, random, re, resource, select, shutil, signal, StringIO 5import socket, struct, subprocess, sys, time, textwrap, urlparse 6import warnings, smtplib, logging, urllib2 7try: 8 import hashlib 9except ImportError: 10 import md5, sha 11from autotest_lib.client.common_lib import error, barrier, logging_manager 12 13def deprecated(func): 14 """This is a decorator which can be used to mark functions as deprecated. 15 It will result in a warning being emmitted when the function is used.""" 16 def new_func(*args, **dargs): 17 warnings.warn("Call to deprecated function %s." % func.__name__, 18 category=DeprecationWarning) 19 return func(*args, **dargs) 20 new_func.__name__ = func.__name__ 21 new_func.__doc__ = func.__doc__ 22 new_func.__dict__.update(func.__dict__) 23 return new_func 24 25 26class _NullStream(object): 27 def write(self, data): 28 pass 29 30 31 def flush(self): 32 pass 33 34 35TEE_TO_LOGS = object() 36_the_null_stream = _NullStream() 37 38DEFAULT_STDOUT_LEVEL = logging.DEBUG 39DEFAULT_STDERR_LEVEL = logging.ERROR 40 41# prefixes for logging stdout/stderr of commands 42STDOUT_PREFIX = '[stdout] ' 43STDERR_PREFIX = '[stderr] ' 44 45 46def get_stream_tee_file(stream, level, prefix=''): 47 if stream is None: 48 return _the_null_stream 49 if stream is TEE_TO_LOGS: 50 return logging_manager.LoggingFile(level=level, prefix=prefix) 51 return stream 52 53 54class BgJob(object): 55 def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True, 56 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL): 57 self.command = command 58 self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL, 59 prefix=STDOUT_PREFIX) 60 self.stderr_tee = get_stream_tee_file(stderr_tee, stderr_level, 61 prefix=STDERR_PREFIX) 62 self.result = CmdResult(command) 63 64 # allow for easy stdin input by string, we'll let subprocess create 65 # a pipe for stdin input and we'll write to it in the wait loop 66 if isinstance(stdin, basestring): 67 self.string_stdin = stdin 68 stdin = subprocess.PIPE 69 else: 70 self.string_stdin = None 71 72 if verbose: 73 logging.debug("Running '%s'" % command) 74 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE, 75 stderr=subprocess.PIPE, 76 preexec_fn=self._reset_sigpipe, shell=True, 77 executable="/bin/bash", 78 stdin=stdin) 79 80 81 def output_prepare(self, stdout_file=None, stderr_file=None): 82 self.stdout_file = stdout_file 83 self.stderr_file = stderr_file 84 85 86 def process_output(self, stdout=True, final_read=False): 87 """output_prepare must be called prior to calling this""" 88 if stdout: 89 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee 90 else: 91 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee 92 93 if final_read: 94 # read in all the data we can from pipe and then stop 95 data = [] 96 while select.select([pipe], [], [], 0)[0]: 97 data.append(os.read(pipe.fileno(), 1024)) 98 if len(data[-1]) == 0: 99 break 100 data = "".join(data) 101 else: 102 # perform a single read 103 data = os.read(pipe.fileno(), 1024) 104 buf.write(data) 105 tee.write(data) 106 107 108 def cleanup(self): 109 self.stdout_tee.flush() 110 self.stderr_tee.flush() 111 self.sp.stdout.close() 112 self.sp.stderr.close() 113 self.result.stdout = self.stdout_file.getvalue() 114 self.result.stderr = self.stderr_file.getvalue() 115 116 117 def _reset_sigpipe(self): 118 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 119 120 121def ip_to_long(ip): 122 # !L is a long in network byte order 123 return struct.unpack('!L', socket.inet_aton(ip))[0] 124 125 126def long_to_ip(number): 127 # See above comment. 128 return socket.inet_ntoa(struct.pack('!L', number)) 129 130 131def create_subnet_mask(bits): 132 return (1 << 32) - (1 << 32-bits) 133 134 135def format_ip_with_mask(ip, mask_bits): 136 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits) 137 return "%s/%s" % (long_to_ip(masked_ip), mask_bits) 138 139 140def normalize_hostname(alias): 141 ip = socket.gethostbyname(alias) 142 return socket.gethostbyaddr(ip)[0] 143 144 145def get_ip_local_port_range(): 146 match = re.match(r'\s*(\d+)\s*(\d+)\s*$', 147 read_one_line('/proc/sys/net/ipv4/ip_local_port_range')) 148 return (int(match.group(1)), int(match.group(2))) 149 150 151def set_ip_local_port_range(lower, upper): 152 write_one_line('/proc/sys/net/ipv4/ip_local_port_range', 153 '%d %d\n' % (lower, upper)) 154 155 156 157def send_email(mail_from, mail_to, subject, body): 158 """ 159 Sends an email via smtp 160 161 mail_from: string with email address of sender 162 mail_to: string or list with email address(es) of recipients 163 subject: string with subject of email 164 body: (multi-line) string with body of email 165 """ 166 if isinstance(mail_to, str): 167 mail_to = [mail_to] 168 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (mail_from, ','.join(mail_to), 169 subject, body) 170 try: 171 mailer = smtplib.SMTP('localhost') 172 try: 173 mailer.sendmail(mail_from, mail_to, msg) 174 finally: 175 mailer.quit() 176 except Exception, e: 177 # Emails are non-critical, not errors, but don't raise them 178 print "Sending email failed. Reason: %s" % repr(e) 179 180 181def read_one_line(filename): 182 return open(filename, 'r').readline().rstrip('\n') 183 184 185def read_file(filename): 186 f = open(filename) 187 try: 188 return f.read() 189 finally: 190 f.close() 191 192 193def write_one_line(filename, line): 194 open_write_close(filename, line.rstrip('\n') + '\n') 195 196 197def open_write_close(filename, data): 198 f = open(filename, 'w') 199 try: 200 f.write(data) 201 finally: 202 f.close() 203 204 205def read_keyval(path): 206 """ 207 Read a key-value pair format file into a dictionary, and return it. 208 Takes either a filename or directory name as input. If it's a 209 directory name, we assume you want the file to be called keyval. 210 """ 211 if os.path.isdir(path): 212 path = os.path.join(path, 'keyval') 213 keyval = {} 214 if os.path.exists(path): 215 for line in open(path): 216 line = re.sub('#.*', '', line).rstrip() 217 if not re.search(r'^[-\.\w]+=', line): 218 raise ValueError('Invalid format line: %s' % line) 219 key, value = line.split('=', 1) 220 if re.search('^\d+$', value): 221 value = int(value) 222 elif re.search('^(\d+\.)?\d+$', value): 223 value = float(value) 224 keyval[key] = value 225 return keyval 226 227 228def write_keyval(path, dictionary, type_tag=None): 229 """ 230 Write a key-value pair format file out to a file. This uses append 231 mode to open the file, so existing text will not be overwritten or 232 reparsed. 233 234 If type_tag is None, then the key must be composed of alphanumeric 235 characters (or dashes+underscores). However, if type-tag is not 236 null then the keys must also have "{type_tag}" as a suffix. At 237 the moment the only valid values of type_tag are "attr" and "perf". 238 """ 239 if os.path.isdir(path): 240 path = os.path.join(path, 'keyval') 241 keyval = open(path, 'a') 242 243 if type_tag is None: 244 key_regex = re.compile(r'^[-\.\w]+$') 245 else: 246 if type_tag not in ('attr', 'perf'): 247 raise ValueError('Invalid type tag: %s' % type_tag) 248 escaped_tag = re.escape(type_tag) 249 key_regex = re.compile(r'^[-\.\w]+\{%s\}$' % escaped_tag) 250 try: 251 for key in sorted(dictionary.keys()): 252 if not key_regex.search(key): 253 raise ValueError('Invalid key: %s' % key) 254 keyval.write('%s=%s\n' % (key, dictionary[key])) 255 finally: 256 keyval.close() 257 258 259def is_url(path): 260 """Return true if path looks like a URL""" 261 # for now, just handle http and ftp 262 url_parts = urlparse.urlparse(path) 263 return (url_parts[0] in ('http', 'ftp')) 264 265 266def urlopen(url, data=None, timeout=5): 267 """Wrapper to urllib2.urlopen with timeout addition.""" 268 269 # Save old timeout 270 old_timeout = socket.getdefaulttimeout() 271 socket.setdefaulttimeout(timeout) 272 try: 273 return urllib2.urlopen(url, data=data) 274 finally: 275 socket.setdefaulttimeout(old_timeout) 276 277 278def urlretrieve(url, filename, data=None, timeout=300): 279 """Retrieve a file from given url.""" 280 logging.debug('Fetching %s -> %s', url, filename) 281 282 src_file = urlopen(url, data=data, timeout=timeout) 283 try: 284 dest_file = open(filename, 'wb') 285 try: 286 shutil.copyfileobj(src_file, dest_file) 287 finally: 288 dest_file.close() 289 finally: 290 src_file.close() 291 292 293def hash(type, input=None): 294 """ 295 Returns an hash object of type md5 or sha1. This function is implemented in 296 order to encapsulate hash objects in a way that is compatible with python 297 2.4 and python 2.6 without warnings. 298 299 Note that even though python 2.6 hashlib supports hash types other than 300 md5 and sha1, we are artificially limiting the input values in order to 301 make the function to behave exactly the same among both python 302 implementations. 303 304 @param input: Optional input string that will be used to update the hash. 305 """ 306 if type not in ['md5', 'sha1']: 307 raise ValueError("Unsupported hash type: %s" % type) 308 309 try: 310 hash = hashlib.new(type) 311 except NameError: 312 if type == 'md5': 313 hash = md5.new() 314 elif type == 'sha1': 315 hash = sha.new() 316 317 if input: 318 hash.update(input) 319 320 return hash 321 322 323def get_file(src, dest, permissions=None): 324 """Get a file from src, which can be local or a remote URL""" 325 if src == dest: 326 return 327 328 if is_url(src): 329 urlretrieve(src, dest) 330 else: 331 shutil.copyfile(src, dest) 332 333 if permissions: 334 os.chmod(dest, permissions) 335 return dest 336 337 338def unmap_url(srcdir, src, destdir='.'): 339 """ 340 Receives either a path to a local file or a URL. 341 returns either the path to the local file, or the fetched URL 342 343 unmap_url('/usr/src', 'foo.tar', '/tmp') 344 = '/usr/src/foo.tar' 345 unmap_url('/usr/src', 'http://site/file', '/tmp') 346 = '/tmp/file' 347 (after retrieving it) 348 """ 349 if is_url(src): 350 url_parts = urlparse.urlparse(src) 351 filename = os.path.basename(url_parts[2]) 352 dest = os.path.join(destdir, filename) 353 return get_file(src, dest) 354 else: 355 return os.path.join(srcdir, src) 356 357 358def update_version(srcdir, preserve_srcdir, new_version, install, 359 *args, **dargs): 360 """ 361 Make sure srcdir is version new_version 362 363 If not, delete it and install() the new version. 364 365 In the preserve_srcdir case, we just check it's up to date, 366 and if not, we rerun install, without removing srcdir 367 """ 368 versionfile = os.path.join(srcdir, '.version') 369 install_needed = True 370 371 if os.path.exists(versionfile): 372 old_version = pickle.load(open(versionfile)) 373 if old_version == new_version: 374 install_needed = False 375 376 if install_needed: 377 if not preserve_srcdir and os.path.exists(srcdir): 378 shutil.rmtree(srcdir) 379 install(*args, **dargs) 380 if os.path.exists(srcdir): 381 pickle.dump(new_version, open(versionfile, 'w')) 382 383 384def get_stderr_level(stderr_is_expected): 385 if stderr_is_expected: 386 return DEFAULT_STDOUT_LEVEL 387 return DEFAULT_STDERR_LEVEL 388 389 390def run(command, timeout=None, ignore_status=False, 391 stdout_tee=None, stderr_tee=None, verbose=True, stdin=None, 392 stderr_is_expected=None, args=()): 393 """ 394 Run a command on the host. 395 396 @param command: the command line string. 397 @param timeout: time limit in seconds before attempting to kill the 398 running process. The run() function will take a few seconds 399 longer than 'timeout' to complete if it has to kill the process. 400 @param ignore_status: do not raise an exception, no matter what the exit 401 code of the command is. 402 @param stdout_tee: optional file-like object to which stdout data 403 will be written as it is generated (data will still be stored 404 in result.stdout). 405 @param stderr_tee: likewise for stderr. 406 @param verbose: if True, log the command being run. 407 @param stdin: stdin to pass to the executed process (can be a file 408 descriptor, a file object of a real file or a string). 409 @param args: sequence of strings of arguments to be given to the command 410 inside " quotes after they have been escaped for that; each 411 element in the sequence will be given as a separate command 412 argument 413 414 @return a CmdResult object 415 416 @raise CmdError: the exit code of the command execution was not 0 417 """ 418 if isinstance(args, basestring): 419 raise TypeError('Got a string for the "args" keyword argument, ' 420 'need a sequence.') 421 422 for arg in args: 423 command += ' "%s"' % sh_escape(arg) 424 if stderr_is_expected is None: 425 stderr_is_expected = ignore_status 426 427 bg_job = join_bg_jobs( 428 (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin, 429 stderr_level=get_stderr_level(stderr_is_expected)),), 430 timeout)[0] 431 if not ignore_status and bg_job.result.exit_status: 432 raise error.CmdError(command, bg_job.result, 433 "Command returned non-zero exit status") 434 435 return bg_job.result 436 437 438def run_parallel(commands, timeout=None, ignore_status=False, 439 stdout_tee=None, stderr_tee=None): 440 """ 441 Behaves the same as run() with the following exceptions: 442 443 - commands is a list of commands to run in parallel. 444 - ignore_status toggles whether or not an exception should be raised 445 on any error. 446 447 @return: a list of CmdResult objects 448 """ 449 bg_jobs = [] 450 for command in commands: 451 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee, 452 stderr_level=get_stderr_level(ignore_status))) 453 454 # Updates objects in bg_jobs list with their process information 455 join_bg_jobs(bg_jobs, timeout) 456 457 for bg_job in bg_jobs: 458 if not ignore_status and bg_job.result.exit_status: 459 raise error.CmdError(command, bg_job.result, 460 "Command returned non-zero exit status") 461 462 return [bg_job.result for bg_job in bg_jobs] 463 464 465@deprecated 466def run_bg(command): 467 """Function deprecated. Please use BgJob class instead.""" 468 bg_job = BgJob(command) 469 return bg_job.sp, bg_job.result 470 471 472def join_bg_jobs(bg_jobs, timeout=None): 473 """Joins the bg_jobs with the current thread. 474 475 Returns the same list of bg_jobs objects that was passed in. 476 """ 477 ret, timeout_error = 0, False 478 for bg_job in bg_jobs: 479 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO()) 480 481 try: 482 # We are holding ends to stdin, stdout pipes 483 # hence we need to be sure to close those fds no mater what 484 start_time = time.time() 485 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout) 486 487 for bg_job in bg_jobs: 488 # Process stdout and stderr 489 bg_job.process_output(stdout=True,final_read=True) 490 bg_job.process_output(stdout=False,final_read=True) 491 finally: 492 # close our ends of the pipes to the sp no matter what 493 for bg_job in bg_jobs: 494 bg_job.cleanup() 495 496 if timeout_error: 497 # TODO: This needs to be fixed to better represent what happens when 498 # running in parallel. However this is backwards compatable, so it will 499 # do for the time being. 500 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result, 501 "Command(s) did not complete within %d seconds" 502 % timeout) 503 504 505 return bg_jobs 506 507 508def _wait_for_commands(bg_jobs, start_time, timeout): 509 # This returns True if it must return due to a timeout, otherwise False. 510 511 # To check for processes which terminate without producing any output 512 # a 1 second timeout is used in select. 513 SELECT_TIMEOUT = 1 514 515 read_list = [] 516 write_list = [] 517 reverse_dict = {} 518 519 for bg_job in bg_jobs: 520 read_list.append(bg_job.sp.stdout) 521 read_list.append(bg_job.sp.stderr) 522 reverse_dict[bg_job.sp.stdout] = (bg_job, True) 523 reverse_dict[bg_job.sp.stderr] = (bg_job, False) 524 if bg_job.string_stdin is not None: 525 write_list.append(bg_job.sp.stdin) 526 reverse_dict[bg_job.sp.stdin] = bg_job 527 528 if timeout: 529 stop_time = start_time + timeout 530 time_left = stop_time - time.time() 531 else: 532 time_left = None # so that select never times out 533 534 while not timeout or time_left > 0: 535 # select will return when we may write to stdin or when there is 536 # stdout/stderr output we can read (including when it is 537 # EOF, that is the process has terminated). 538 read_ready, write_ready, _ = select.select(read_list, write_list, [], 539 SELECT_TIMEOUT) 540 541 # os.read() has to be used instead of 542 # subproc.stdout.read() which will otherwise block 543 for file_obj in read_ready: 544 bg_job, is_stdout = reverse_dict[file_obj] 545 bg_job.process_output(is_stdout) 546 547 for file_obj in write_ready: 548 # we can write PIPE_BUF bytes without blocking 549 # POSIX requires PIPE_BUF is >= 512 550 bg_job = reverse_dict[file_obj] 551 file_obj.write(bg_job.string_stdin[:512]) 552 bg_job.string_stdin = bg_job.string_stdin[512:] 553 # no more input data, close stdin, remove it from the select set 554 if not bg_job.string_stdin: 555 file_obj.close() 556 write_list.remove(file_obj) 557 del reverse_dict[file_obj] 558 559 all_jobs_finished = True 560 for bg_job in bg_jobs: 561 if bg_job.result.exit_status is not None: 562 continue 563 564 bg_job.result.exit_status = bg_job.sp.poll() 565 if bg_job.result.exit_status is not None: 566 # process exited, remove its stdout/stdin from the select set 567 read_list.remove(bg_job.sp.stdout) 568 read_list.remove(bg_job.sp.stderr) 569 del reverse_dict[bg_job.sp.stdout] 570 del reverse_dict[bg_job.sp.stderr] 571 else: 572 all_jobs_finished = False 573 574 if all_jobs_finished: 575 return False 576 577 if timeout: 578 time_left = stop_time - time.time() 579 580 # Kill all processes which did not complete prior to timeout 581 for bg_job in bg_jobs: 582 if bg_job.result.exit_status is not None: 583 continue 584 585 logging.warn('run process timeout (%s) fired on: %s', timeout, 586 bg_job.command) 587 nuke_subprocess(bg_job.sp) 588 bg_job.result.exit_status = bg_job.sp.poll() 589 590 return True 591 592 593def pid_is_alive(pid): 594 """ 595 True if process pid exists and is not yet stuck in Zombie state. 596 Zombies are impossible to move between cgroups, etc. 597 pid can be integer, or text of integer. 598 """ 599 path = '/proc/%s/stat' % pid 600 601 try: 602 stat = read_one_line(path) 603 except IOError: 604 if not os.path.exists(path): 605 # file went away 606 return False 607 raise 608 609 return stat.split()[2] != 'Z' 610 611 612def signal_pid(pid, sig): 613 """ 614 Sends a signal to a process id. Returns True if the process terminated 615 successfully, False otherwise. 616 """ 617 try: 618 os.kill(pid, sig) 619 except OSError: 620 # The process may have died before we could kill it. 621 pass 622 623 for i in range(5): 624 if not pid_is_alive(pid): 625 return True 626 time.sleep(1) 627 628 # The process is still alive 629 return False 630 631 632def nuke_subprocess(subproc): 633 # check if the subprocess is still alive, first 634 if subproc.poll() is not None: 635 return subproc.poll() 636 637 # the process has not terminated within timeout, 638 # kill it via an escalating series of signals. 639 signal_queue = [signal.SIGTERM, signal.SIGKILL] 640 for sig in signal_queue: 641 signal_pid(subproc.pid, sig) 642 if subproc.poll() is not None: 643 return subproc.poll() 644 645 646def nuke_pid(pid, signal_queue=(signal.SIGTERM, signal.SIGKILL)): 647 # the process has not terminated within timeout, 648 # kill it via an escalating series of signals. 649 for sig in signal_queue: 650 if signal_pid(pid, sig): 651 return 652 653 # no signal successfully terminated the process 654 raise error.AutoservRunError('Could not kill %d' % pid, None) 655 656 657def system(command, timeout=None, ignore_status=False): 658 """ 659 Run a command 660 661 @param timeout: timeout in seconds 662 @param ignore_status: if ignore_status=False, throw an exception if the 663 command's exit code is non-zero 664 if ignore_stauts=True, return the exit code. 665 666 @return exit status of command 667 (note, this will always be zero unless ignore_status=True) 668 """ 669 return run(command, timeout=timeout, ignore_status=ignore_status, 670 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).exit_status 671 672 673def system_parallel(commands, timeout=None, ignore_status=False): 674 """This function returns a list of exit statuses for the respective 675 list of commands.""" 676 return [bg_jobs.exit_status for bg_jobs in 677 run_parallel(commands, timeout=timeout, ignore_status=ignore_status, 678 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)] 679 680 681def system_output(command, timeout=None, ignore_status=False, 682 retain_output=False, args=()): 683 """ 684 Run a command and return the stdout output. 685 686 @param command: command string to execute. 687 @param timeout: time limit in seconds before attempting to kill the 688 running process. The function will take a few seconds longer 689 than 'timeout' to complete if it has to kill the process. 690 @param ignore_status: do not raise an exception, no matter what the exit 691 code of the command is. 692 @param retain_output: set to True to make stdout/stderr of the command 693 output to be also sent to the logging system 694 @param args: sequence of strings of arguments to be given to the command 695 inside " quotes after they have been escaped for that; each 696 element in the sequence will be given as a separate command 697 argument 698 699 @return a string with the stdout output of the command. 700 """ 701 if retain_output: 702 out = run(command, timeout=timeout, ignore_status=ignore_status, 703 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS, 704 args=args).stdout 705 else: 706 out = run(command, timeout=timeout, ignore_status=ignore_status, 707 args=args).stdout 708 if out[-1:] == '\n': 709 out = out[:-1] 710 return out 711 712 713def system_output_parallel(commands, timeout=None, ignore_status=False, 714 retain_output=False): 715 if retain_output: 716 out = [bg_job.stdout for bg_job 717 in run_parallel(commands, timeout=timeout, 718 ignore_status=ignore_status, 719 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)] 720 else: 721 out = [bg_job.stdout for bg_job in run_parallel(commands, 722 timeout=timeout, ignore_status=ignore_status)] 723 for x in out: 724 if out[-1:] == '\n': out = out[:-1] 725 return out 726 727 728def strip_unicode(input): 729 if type(input) == list: 730 return [strip_unicode(i) for i in input] 731 elif type(input) == dict: 732 output = {} 733 for key in input.keys(): 734 output[str(key)] = strip_unicode(input[key]) 735 return output 736 elif type(input) == unicode: 737 return str(input) 738 else: 739 return input 740 741 742def get_cpu_percentage(function, *args, **dargs): 743 """Returns a tuple containing the CPU% and return value from function call. 744 745 This function calculates the usage time by taking the difference of 746 the user and system times both before and after the function call. 747 """ 748 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN) 749 self_pre = resource.getrusage(resource.RUSAGE_SELF) 750 start = time.time() 751 to_return = function(*args, **dargs) 752 elapsed = time.time() - start 753 self_post = resource.getrusage(resource.RUSAGE_SELF) 754 child_post = resource.getrusage(resource.RUSAGE_CHILDREN) 755 756 # Calculate CPU Percentage 757 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]] 758 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]] 759 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed 760 761 return cpu_percent, to_return 762 763 764""" 765This function is used when there is a need to run more than one 766job simultaneously starting exactly at the same time. It basically returns 767a modified control file (containing the synchronization code prepended) 768whenever it is ready to run the control file. The synchronization 769is done using barriers to make sure that the jobs start at the same time. 770 771Here is how the synchronization is done to make sure that the tests 772start at exactly the same time on the client. 773sc_bar is a server barrier and s_bar, c_bar are the normal barriers 774 775 Job1 Job2 ...... JobN 776 Server: | sc_bar 777 Server: | s_bar ...... s_bar 778 Server: | at.run() at.run() ...... at.run() 779 ----------|------------------------------------------------------ 780 Client | sc_bar 781 Client | c_bar c_bar ...... c_bar 782 Client | <run test> <run test> ...... <run test> 783 784 785PARAMS: 786 control_file : The control file which to which the above synchronization 787 code would be prepended to 788 host_name : The host name on which the job is going to run 789 host_num (non negative) : A number to identify the machine so that we have 790 different sets of s_bar_ports for each of the machines. 791 instance : The number of the job 792 num_jobs : Total number of jobs that are going to run in parallel with 793 this job starting at the same time 794 port_base : Port number that is used to derive the actual barrier ports. 795 796RETURN VALUE: 797 The modified control file. 798 799""" 800def get_sync_control_file(control, host_name, host_num, 801 instance, num_jobs, port_base=63100): 802 sc_bar_port = port_base 803 c_bar_port = port_base 804 if host_num < 0: 805 print "Please provide a non negative number for the host" 806 return None 807 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 808 # the same for a given machine 809 810 sc_bar_timeout = 180 811 s_bar_timeout = c_bar_timeout = 120 812 813 # The barrier code snippet is prepended into the conrol file 814 # dynamically before at.run() is called finally. 815 control_new = [] 816 817 # jobid is the unique name used to identify the processes 818 # trying to reach the barriers 819 jobid = "%s#%d" % (host_name, instance) 820 821 rendv = [] 822 # rendvstr is a temp holder for the rendezvous list of the processes 823 for n in range(num_jobs): 824 rendv.append("'%s#%d'" % (host_name, n)) 825 rendvstr = ",".join(rendv) 826 827 if instance == 0: 828 # Do the setup and wait at the server barrier 829 # Clean up the tmp and the control dirs for the first instance 830 control_new.append('if os.path.exists(job.tmpdir):') 831 control_new.append("\t system('umount -f %s > /dev/null" 832 "2> /dev/null' % job.tmpdir," 833 "ignore_status=True)") 834 control_new.append("\t system('rm -rf ' + job.tmpdir)") 835 control_new.append( 836 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 837 % (jobid, sc_bar_timeout, sc_bar_port)) 838 control_new.append( 839 'b0.rendezvous_servers("PARALLEL_MASTER", "%s")' 840 % jobid) 841 842 elif instance == 1: 843 # Wait at the server barrier to wait for instance=0 844 # process to complete setup 845 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 846 port=sc_bar_port) 847 b0.rendezvous_servers("PARALLEL_MASTER", jobid) 848 849 if(num_jobs > 2): 850 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 851 port=s_bar_port) 852 b1.rendezvous(rendvstr) 853 854 else: 855 # For the rest of the clients 856 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 857 b2.rendezvous(rendvstr) 858 859 # Client side barrier for all the tests to start at the same time 860 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 861 % (jobid, c_bar_timeout, c_bar_port)) 862 control_new.append("b1.rendezvous(%s)" % rendvstr) 863 864 # Stick in the rest of the control file 865 control_new.append(control) 866 867 return "\n".join(control_new) 868 869 870def get_arch(run_function=run): 871 """ 872 Get the hardware architecture of the machine. 873 run_function is used to execute the commands. It defaults to 874 utils.run() but a custom method (if provided) should be of the 875 same schema as utils.run. It should return a CmdResult object and 876 throw a CmdError exception. 877 """ 878 arch = run_function('/bin/uname -m').stdout.rstrip() 879 if re.match(r'i\d86$', arch): 880 arch = 'i386' 881 return arch 882 883 884def get_num_logical_cpus_per_socket(run_function=run): 885 """ 886 Get the number of cores (including hyperthreading) per cpu. 887 run_function is used to execute the commands. It defaults to 888 utils.run() but a custom method (if provided) should be of the 889 same schema as utils.run. It should return a CmdResult object and 890 throw a CmdError exception. 891 """ 892 siblings = run_function('grep "^siblings" /proc/cpuinfo').stdout.rstrip() 893 num_siblings = map(int, 894 re.findall(r'^siblings\s*:\s*(\d+)\s*$', 895 siblings, re.M)) 896 if len(num_siblings) == 0: 897 raise error.TestError('Unable to find siblings info in /proc/cpuinfo') 898 if min(num_siblings) != max(num_siblings): 899 raise error.TestError('Number of siblings differ %r' % 900 num_siblings) 901 return num_siblings[0] 902 903 904def merge_trees(src, dest): 905 """ 906 Merges a source directory tree at 'src' into a destination tree at 907 'dest'. If a path is a file in both trees than the file in the source 908 tree is APPENDED to the one in the destination tree. If a path is 909 a directory in both trees then the directories are recursively merged 910 with this function. In any other case, the function will skip the 911 paths that cannot be merged (instead of failing). 912 """ 913 if not os.path.exists(src): 914 return # exists only in dest 915 elif not os.path.exists(dest): 916 if os.path.isfile(src): 917 shutil.copy2(src, dest) # file only in src 918 else: 919 shutil.copytree(src, dest, symlinks=True) # dir only in src 920 return 921 elif os.path.isfile(src) and os.path.isfile(dest): 922 # src & dest are files in both trees, append src to dest 923 destfile = open(dest, "a") 924 try: 925 srcfile = open(src) 926 try: 927 destfile.write(srcfile.read()) 928 finally: 929 srcfile.close() 930 finally: 931 destfile.close() 932 elif os.path.isdir(src) and os.path.isdir(dest): 933 # src & dest are directories in both trees, so recursively merge 934 for name in os.listdir(src): 935 merge_trees(os.path.join(src, name), os.path.join(dest, name)) 936 else: 937 # src & dest both exist, but are incompatible 938 return 939 940 941class CmdResult(object): 942 """ 943 Command execution result. 944 945 command: String containing the command line itself 946 exit_status: Integer exit code of the process 947 stdout: String containing stdout of the process 948 stderr: String containing stderr of the process 949 duration: Elapsed wall clock time running the process 950 """ 951 952 953 def __init__(self, command="", stdout="", stderr="", 954 exit_status=None, duration=0): 955 self.command = command 956 self.exit_status = exit_status 957 self.stdout = stdout 958 self.stderr = stderr 959 self.duration = duration 960 961 962 def __repr__(self): 963 wrapper = textwrap.TextWrapper(width = 78, 964 initial_indent="\n ", 965 subsequent_indent=" ") 966 967 stdout = self.stdout.rstrip() 968 if stdout: 969 stdout = "\nstdout:\n%s" % stdout 970 971 stderr = self.stderr.rstrip() 972 if stderr: 973 stderr = "\nstderr:\n%s" % stderr 974 975 return ("* Command: %s\n" 976 "Exit status: %s\n" 977 "Duration: %s\n" 978 "%s" 979 "%s" 980 % (wrapper.fill(self.command), self.exit_status, 981 self.duration, stdout, stderr)) 982 983 984class run_randomly: 985 def __init__(self, run_sequentially=False): 986 # Run sequentially is for debugging control files 987 self.test_list = [] 988 self.run_sequentially = run_sequentially 989 990 991 def add(self, *args, **dargs): 992 test = (args, dargs) 993 self.test_list.append(test) 994 995 996 def run(self, fn): 997 while self.test_list: 998 test_index = random.randint(0, len(self.test_list)-1) 999 if self.run_sequentially: 1000 test_index = 0 1001 (args, dargs) = self.test_list.pop(test_index) 1002 fn(*args, **dargs) 1003 1004 1005def import_site_module(path, module, dummy=None, modulefile=None): 1006 """ 1007 Try to import the site specific module if it exists. 1008 1009 @param path full filename of the source file calling this (ie __file__) 1010 @param module full module name 1011 @param dummy dummy value to return in case there is no symbol to import 1012 @param modulefile module filename 1013 1014 @return site specific module or dummy 1015 1016 @raises ImportError if the site file exists but imports fails 1017 """ 1018 short_module = module[module.rfind(".") + 1:] 1019 1020 if not modulefile: 1021 modulefile = short_module + ".py" 1022 1023 if os.path.exists(os.path.join(os.path.dirname(path), modulefile)): 1024 return __import__(module, {}, {}, [short_module]) 1025 return dummy 1026 1027 1028def import_site_symbol(path, module, name, dummy=None, modulefile=None): 1029 """ 1030 Try to import site specific symbol from site specific file if it exists 1031 1032 @param path full filename of the source file calling this (ie __file__) 1033 @param module full module name 1034 @param name symbol name to be imported from the site file 1035 @param dummy dummy value to return in case there is no symbol to import 1036 @param modulefile module filename 1037 1038 @return site specific symbol or dummy 1039 1040 @raises ImportError if the site file exists but imports fails 1041 """ 1042 module = import_site_module(path, module, modulefile=modulefile) 1043 if not module: 1044 return dummy 1045 1046 # special unique value to tell us if the symbol can't be imported 1047 cant_import = object() 1048 1049 obj = getattr(module, name, cant_import) 1050 if obj is cant_import: 1051 logging.debug("unable to import site symbol '%s', using non-site " 1052 "implementation", name) 1053 return dummy 1054 1055 return obj 1056 1057 1058def import_site_class(path, module, classname, baseclass, modulefile=None): 1059 """ 1060 Try to import site specific class from site specific file if it exists 1061 1062 Args: 1063 path: full filename of the source file calling this (ie __file__) 1064 module: full module name 1065 classname: class name to be loaded from site file 1066 baseclass: base class object to return when no site file present or 1067 to mixin when site class exists but is not inherited from baseclass 1068 modulefile: module filename 1069 1070 Returns: baseclass if site specific class does not exist, the site specific 1071 class if it exists and is inherited from baseclass or a mixin of the 1072 site specific class and baseclass when the site specific class exists 1073 and is not inherited from baseclass 1074 1075 Raises: ImportError if the site file exists but imports fails 1076 """ 1077 1078 res = import_site_symbol(path, module, classname, None, modulefile) 1079 if res: 1080 if not issubclass(res, baseclass): 1081 # if not a subclass of baseclass then mix in baseclass with the 1082 # site specific class object and return the result 1083 res = type(classname, (res, baseclass), {}) 1084 else: 1085 res = baseclass 1086 1087 return res 1088 1089 1090def import_site_function(path, module, funcname, dummy, modulefile=None): 1091 """ 1092 Try to import site specific function from site specific file if it exists 1093 1094 Args: 1095 path: full filename of the source file calling this (ie __file__) 1096 module: full module name 1097 funcname: function name to be imported from site file 1098 dummy: dummy function to return in case there is no function to import 1099 modulefile: module filename 1100 1101 Returns: site specific function object or dummy 1102 1103 Raises: ImportError if the site file exists but imports fails 1104 """ 1105 1106 return import_site_symbol(path, module, funcname, dummy, modulefile) 1107 1108 1109def _get_pid_path(program_name): 1110 my_path = os.path.dirname(__file__) 1111 return os.path.abspath(os.path.join(my_path, "..", "..", 1112 "%s.pid" % program_name)) 1113 1114 1115def write_pid(program_name): 1116 """ 1117 Try to drop <program_name>.pid in the main autotest directory. 1118 1119 Args: 1120 program_name: prefix for file name 1121 """ 1122 pidfile = open(_get_pid_path(program_name), "w") 1123 try: 1124 pidfile.write("%s\n" % os.getpid()) 1125 finally: 1126 pidfile.close() 1127 1128 1129def delete_pid_file_if_exists(program_name): 1130 """ 1131 Tries to remove <program_name>.pid from the main autotest directory. 1132 """ 1133 pidfile_path = _get_pid_path(program_name) 1134 1135 try: 1136 os.remove(pidfile_path) 1137 except OSError: 1138 if not os.path.exists(pidfile_path): 1139 return 1140 raise 1141 1142 1143def get_pid_from_file(program_name): 1144 """ 1145 Reads the pid from <program_name>.pid in the autotest directory. 1146 1147 @param program_name the name of the program 1148 @return the pid if the file exists, None otherwise. 1149 """ 1150 pidfile_path = _get_pid_path(program_name) 1151 if not os.path.exists(pidfile_path): 1152 return None 1153 1154 pidfile = open(_get_pid_path(program_name), 'r') 1155 1156 try: 1157 try: 1158 pid = int(pidfile.readline()) 1159 except IOError: 1160 if not os.path.exists(pidfile_path): 1161 return None 1162 raise 1163 finally: 1164 pidfile.close() 1165 1166 return pid 1167 1168 1169def program_is_alive(program_name): 1170 """ 1171 Checks if the process is alive and not in Zombie state. 1172 1173 @param program_name the name of the program 1174 @return True if still alive, False otherwise 1175 """ 1176 pid = get_pid_from_file(program_name) 1177 if pid is None: 1178 return False 1179 return pid_is_alive(pid) 1180 1181 1182def signal_program(program_name, sig=signal.SIGTERM): 1183 """ 1184 Sends a signal to the process listed in <program_name>.pid 1185 1186 @param program_name the name of the program 1187 @param sig signal to send 1188 """ 1189 pid = get_pid_from_file(program_name) 1190 if pid: 1191 signal_pid(pid, sig) 1192 1193 1194def get_relative_path(path, reference): 1195 """Given 2 absolute paths "path" and "reference", compute the path of 1196 "path" as relative to the directory "reference". 1197 1198 @param path the absolute path to convert to a relative path 1199 @param reference an absolute directory path to which the relative 1200 path will be computed 1201 """ 1202 # normalize the paths (remove double slashes, etc) 1203 assert(os.path.isabs(path)) 1204 assert(os.path.isabs(reference)) 1205 1206 path = os.path.normpath(path) 1207 reference = os.path.normpath(reference) 1208 1209 # we could use os.path.split() but it splits from the end 1210 path_list = path.split(os.path.sep)[1:] 1211 ref_list = reference.split(os.path.sep)[1:] 1212 1213 # find the longest leading common path 1214 for i in xrange(min(len(path_list), len(ref_list))): 1215 if path_list[i] != ref_list[i]: 1216 # decrement i so when exiting this loop either by no match or by 1217 # end of range we are one step behind 1218 i -= 1 1219 break 1220 i += 1 1221 # drop the common part of the paths, not interested in that anymore 1222 del path_list[:i] 1223 1224 # for each uncommon component in the reference prepend a ".." 1225 path_list[:0] = ['..'] * (len(ref_list) - i) 1226 1227 return os.path.join(*path_list) 1228 1229 1230def sh_escape(command): 1231 """ 1232 Escape special characters from a command so that it can be passed 1233 as a double quoted (" ") string in a (ba)sh command. 1234 1235 Args: 1236 command: the command string to escape. 1237 1238 Returns: 1239 The escaped command string. The required englobing double 1240 quotes are NOT added and so should be added at some point by 1241 the caller. 1242 1243 See also: http://www.tldp.org/LDP/abs/html/escapingsection.html 1244 """ 1245 command = command.replace("\\", "\\\\") 1246 command = command.replace("$", r'\$') 1247 command = command.replace('"', r'\"') 1248 command = command.replace('`', r'\`') 1249 return command 1250