utils.py revision 549afada66cc95a35fb0be76357f335016a1b122
1# 2# Copyright 2008 Google Inc. Released under the GPL v2 3 4import os, pickle, random, re, resource, select, shutil, signal, StringIO 5import socket, struct, subprocess, sys, time, textwrap, urlparse 6import warnings, smtplib, logging, urllib2 7from autotest_lib.client.common_lib import error, barrier, logging_manager 8 9def deprecated(func): 10 """This is a decorator which can be used to mark functions as deprecated. 11 It will result in a warning being emmitted when the function is used.""" 12 def new_func(*args, **dargs): 13 warnings.warn("Call to deprecated function %s." % func.__name__, 14 category=DeprecationWarning) 15 return func(*args, **dargs) 16 new_func.__name__ = func.__name__ 17 new_func.__doc__ = func.__doc__ 18 new_func.__dict__.update(func.__dict__) 19 return new_func 20 21 22class _NullStream(object): 23 def write(self, data): 24 pass 25 26 27 def flush(self): 28 pass 29 30 31TEE_TO_LOGS = object() 32_the_null_stream = _NullStream() 33 34DEFAULT_STDOUT_LEVEL = logging.DEBUG 35DEFAULT_STDERR_LEVEL = logging.ERROR 36 37def get_stream_tee_file(stream, level): 38 if stream is None: 39 return _the_null_stream 40 if stream is TEE_TO_LOGS: 41 return logging_manager.LoggingFile(level=level) 42 return stream 43 44 45class BgJob(object): 46 def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True, 47 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL): 48 self.command = command 49 self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL) 50 self.stderr_tee = get_stream_tee_file(stderr_tee, stderr_level) 51 self.result = CmdResult(command) 52 if verbose: 53 logging.debug("Running '%s'" % command) 54 self.sp = subprocess.Popen(command, stdout=subprocess.PIPE, 55 stderr=subprocess.PIPE, 56 preexec_fn=self._reset_sigpipe, shell=True, 57 executable="/bin/bash", 58 stdin=stdin) 59 60 61 def output_prepare(self, stdout_file=None, stderr_file=None): 62 self.stdout_file = stdout_file 63 self.stderr_file = stderr_file 64 65 66 def process_output(self, stdout=True, final_read=False): 67 """output_prepare must be called prior to calling this""" 68 if stdout: 69 pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee 70 else: 71 pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee 72 73 if final_read: 74 # read in all the data we can from pipe and then stop 75 data = [] 76 while select.select([pipe], [], [], 0)[0]: 77 data.append(os.read(pipe.fileno(), 1024)) 78 if len(data[-1]) == 0: 79 break 80 data = "".join(data) 81 else: 82 # perform a single read 83 data = os.read(pipe.fileno(), 1024) 84 buf.write(data) 85 tee.write(data) 86 87 88 def cleanup(self): 89 self.stdout_tee.flush() 90 self.stderr_tee.flush() 91 self.sp.stdout.close() 92 self.sp.stderr.close() 93 self.result.stdout = self.stdout_file.getvalue() 94 self.result.stderr = self.stderr_file.getvalue() 95 96 97 def _reset_sigpipe(self): 98 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 99 100 101def ip_to_long(ip): 102 # !L is a long in network byte order 103 return struct.unpack('!L', socket.inet_aton(ip))[0] 104 105 106def long_to_ip(number): 107 # See above comment. 108 return socket.inet_ntoa(struct.pack('!L', number)) 109 110 111def create_subnet_mask(bits): 112 return (1 << 32) - (1 << 32-bits) 113 114 115def format_ip_with_mask(ip, mask_bits): 116 masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits) 117 return "%s/%s" % (long_to_ip(masked_ip), mask_bits) 118 119 120def normalize_hostname(alias): 121 ip = socket.gethostbyname(alias) 122 return socket.gethostbyaddr(ip)[0] 123 124 125def get_ip_local_port_range(): 126 match = re.match(r'\s*(\d+)\s*(\d+)\s*$', 127 read_one_line('/proc/sys/net/ipv4/ip_local_port_range')) 128 return (int(match.group(1)), int(match.group(2))) 129 130 131def set_ip_local_port_range(lower, upper): 132 write_one_line('/proc/sys/net/ipv4/ip_local_port_range', 133 '%d %d\n' % (lower, upper)) 134 135 136 137def send_email(mail_from, mail_to, subject, body): 138 """ 139 Sends an email via smtp 140 141 mail_from: string with email address of sender 142 mail_to: string or list with email address(es) of recipients 143 subject: string with subject of email 144 body: (multi-line) string with body of email 145 """ 146 if isinstance(mail_to, str): 147 mail_to = [mail_to] 148 msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (mail_from, ','.join(mail_to), 149 subject, body) 150 try: 151 mailer = smtplib.SMTP('localhost') 152 try: 153 mailer.sendmail(mail_from, mail_to, msg) 154 finally: 155 mailer.quit() 156 except Exception, e: 157 # Emails are non-critical, not errors, but don't raise them 158 print "Sending email failed. Reason: %s" % repr(e) 159 160 161def read_one_line(filename): 162 return open(filename, 'r').readline().rstrip('\n') 163 164 165def write_one_line(filename, line): 166 open_write_close(filename, line.rstrip('\n') + '\n') 167 168 169def open_write_close(filename, data): 170 f = open(filename, 'w') 171 try: 172 f.write(data) 173 finally: 174 f.close() 175 176 177def read_keyval(path): 178 """ 179 Read a key-value pair format file into a dictionary, and return it. 180 Takes either a filename or directory name as input. If it's a 181 directory name, we assume you want the file to be called keyval. 182 """ 183 if os.path.isdir(path): 184 path = os.path.join(path, 'keyval') 185 keyval = {} 186 if os.path.exists(path): 187 for line in open(path): 188 line = re.sub('#.*', '', line).rstrip() 189 if not re.search(r'^[-\.\w]+=', line): 190 raise ValueError('Invalid format line: %s' % line) 191 key, value = line.split('=', 1) 192 if re.search('^\d+$', value): 193 value = int(value) 194 elif re.search('^(\d+\.)?\d+$', value): 195 value = float(value) 196 keyval[key] = value 197 return keyval 198 199 200def write_keyval(path, dictionary, type_tag=None): 201 """ 202 Write a key-value pair format file out to a file. This uses append 203 mode to open the file, so existing text will not be overwritten or 204 reparsed. 205 206 If type_tag is None, then the key must be composed of alphanumeric 207 characters (or dashes+underscores). However, if type-tag is not 208 null then the keys must also have "{type_tag}" as a suffix. At 209 the moment the only valid values of type_tag are "attr" and "perf". 210 """ 211 if os.path.isdir(path): 212 path = os.path.join(path, 'keyval') 213 keyval = open(path, 'a') 214 215 if type_tag is None: 216 key_regex = re.compile(r'^[-\.\w]+$') 217 else: 218 if type_tag not in ('attr', 'perf'): 219 raise ValueError('Invalid type tag: %s' % type_tag) 220 escaped_tag = re.escape(type_tag) 221 key_regex = re.compile(r'^[-\.\w]+\{%s\}$' % escaped_tag) 222 try: 223 for key in sorted(dictionary.keys()): 224 if not key_regex.search(key): 225 raise ValueError('Invalid key: %s' % key) 226 keyval.write('%s=%s\n' % (key, dictionary[key])) 227 finally: 228 keyval.close() 229 230 231def is_url(path): 232 """Return true if path looks like a URL""" 233 # for now, just handle http and ftp 234 url_parts = urlparse.urlparse(path) 235 return (url_parts[0] in ('http', 'ftp')) 236 237 238def urlopen(url, data=None, timeout=5): 239 """Wrapper to urllib2.urlopen with timeout addition.""" 240 241 # Save old timeout 242 old_timeout = socket.getdefaulttimeout() 243 socket.setdefaulttimeout(timeout) 244 try: 245 return urllib2.urlopen(url, data=data) 246 finally: 247 socket.setdefaulttimeout(old_timeout) 248 249 250def urlretrieve(url, filename, data=None, timeout=300): 251 """Retrieve a file from given url.""" 252 logging.debug('Fetching %s -> %s', url, filename) 253 254 src_file = urlopen(url, data=data, timeout=timeout) 255 try: 256 dest_file = open(filename, 'wb') 257 try: 258 shutil.copyfileobj(src_file, dest_file) 259 finally: 260 dest_file.close() 261 finally: 262 src_file.close() 263 264 265def get_file(src, dest, permissions=None): 266 """Get a file from src, which can be local or a remote URL""" 267 if src == dest: 268 return 269 270 if is_url(src): 271 urlretrieve(src, dest) 272 else: 273 shutil.copyfile(src, dest) 274 275 if permissions: 276 os.chmod(dest, permissions) 277 return dest 278 279 280def unmap_url(srcdir, src, destdir='.'): 281 """ 282 Receives either a path to a local file or a URL. 283 returns either the path to the local file, or the fetched URL 284 285 unmap_url('/usr/src', 'foo.tar', '/tmp') 286 = '/usr/src/foo.tar' 287 unmap_url('/usr/src', 'http://site/file', '/tmp') 288 = '/tmp/file' 289 (after retrieving it) 290 """ 291 if is_url(src): 292 url_parts = urlparse.urlparse(src) 293 filename = os.path.basename(url_parts[2]) 294 dest = os.path.join(destdir, filename) 295 return get_file(src, dest) 296 else: 297 return os.path.join(srcdir, src) 298 299 300def update_version(srcdir, preserve_srcdir, new_version, install, 301 *args, **dargs): 302 """ 303 Make sure srcdir is version new_version 304 305 If not, delete it and install() the new version. 306 307 In the preserve_srcdir case, we just check it's up to date, 308 and if not, we rerun install, without removing srcdir 309 """ 310 versionfile = os.path.join(srcdir, '.version') 311 install_needed = True 312 313 if os.path.exists(versionfile): 314 old_version = pickle.load(open(versionfile)) 315 if old_version == new_version: 316 install_needed = False 317 318 if install_needed: 319 if not preserve_srcdir and os.path.exists(srcdir): 320 shutil.rmtree(srcdir) 321 install(*args, **dargs) 322 if os.path.exists(srcdir): 323 pickle.dump(new_version, open(versionfile, 'w')) 324 325 326def get_stderr_level(stderr_is_expected): 327 if stderr_is_expected: 328 return DEFAULT_STDOUT_LEVEL 329 return DEFAULT_STDERR_LEVEL 330 331 332def run(command, timeout=None, ignore_status=False, 333 stdout_tee=None, stderr_tee=None, verbose=True, stdin=None, 334 stderr_is_expected=None): 335 """ 336 Run a command on the host. 337 338 Args: 339 command: the command line string 340 timeout: time limit in seconds before attempting to 341 kill the running process. The run() function 342 will take a few seconds longer than 'timeout' 343 to complete if it has to kill the process. 344 ignore_status: do not raise an exception, no matter what 345 the exit code of the command is. 346 stdout_tee: optional file-like object to which stdout data 347 will be written as it is generated (data will still 348 be stored in result.stdout) 349 stderr_tee: likewise for stderr 350 verbose: if True, log the command being run 351 stdin: stdin to pass to the executed process 352 353 Returns: 354 a CmdResult object 355 356 Raises: 357 CmdError: the exit code of the command 358 execution was not 0 359 """ 360 if stderr_is_expected is None: 361 stderr_is_expected = ignore_status 362 bg_job = join_bg_jobs( 363 (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin, 364 stderr_level=get_stderr_level(stderr_is_expected)),), 365 timeout)[0] 366 if not ignore_status and bg_job.result.exit_status: 367 raise error.CmdError(command, bg_job.result, 368 "Command returned non-zero exit status") 369 370 return bg_job.result 371 372 373def run_parallel(commands, timeout=None, ignore_status=False, 374 stdout_tee=None, stderr_tee=None): 375 """Beahves the same as run with the following exceptions: 376 377 - commands is a list of commands to run in parallel. 378 - ignore_status toggles whether or not an exception should be raised 379 on any error. 380 381 returns a list of CmdResult objects 382 """ 383 bg_jobs = [] 384 for command in commands: 385 bg_jobs.append(BgJob(command, stdout_tee, stderr_tee, 386 stderr_level=get_stderr_level(ignore_status))) 387 388 # Updates objects in bg_jobs list with their process information 389 join_bg_jobs(bg_jobs, timeout) 390 391 for bg_job in bg_jobs: 392 if not ignore_status and bg_job.result.exit_status: 393 raise error.CmdError(command, bg_job.result, 394 "Command returned non-zero exit status") 395 396 return [bg_job.result for bg_job in bg_jobs] 397 398 399@deprecated 400def run_bg(command): 401 """Function deprecated. Please use BgJob class instead.""" 402 bg_job = BgJob(command) 403 return bg_job.sp, bg_job.result 404 405 406def join_bg_jobs(bg_jobs, timeout=None): 407 """Joins the bg_jobs with the current thread. 408 409 Returns the same list of bg_jobs objects that was passed in. 410 """ 411 ret, timeout_error = 0, False 412 for bg_job in bg_jobs: 413 bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO()) 414 415 try: 416 # We are holding ends to stdin, stdout pipes 417 # hence we need to be sure to close those fds no mater what 418 start_time = time.time() 419 timeout_error = _wait_for_commands(bg_jobs, start_time, timeout) 420 421 for bg_job in bg_jobs: 422 # Process stdout and stderr 423 bg_job.process_output(stdout=True,final_read=True) 424 bg_job.process_output(stdout=False,final_read=True) 425 finally: 426 # close our ends of the pipes to the sp no matter what 427 for bg_job in bg_jobs: 428 bg_job.cleanup() 429 430 if timeout_error: 431 # TODO: This needs to be fixed to better represent what happens when 432 # running in parallel. However this is backwards compatable, so it will 433 # do for the time being. 434 raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result, 435 "Command(s) did not complete within %d seconds" 436 % timeout) 437 438 439 return bg_jobs 440 441 442def _wait_for_commands(bg_jobs, start_time, timeout): 443 # This returns True if it must return due to a timeout, otherwise False. 444 445 # To check for processes which terminate without producing any output 446 # a 1 second timeout is used in select. 447 SELECT_TIMEOUT = 1 448 449 select_list = [] 450 reverse_dict = {} 451 for bg_job in bg_jobs: 452 select_list.append(bg_job.sp.stdout) 453 select_list.append(bg_job.sp.stderr) 454 reverse_dict[bg_job.sp.stdout] = (bg_job,True) 455 reverse_dict[bg_job.sp.stderr] = (bg_job,False) 456 457 if timeout: 458 stop_time = start_time + timeout 459 time_left = stop_time - time.time() 460 else: 461 time_left = None # so that select never times out 462 while not timeout or time_left > 0: 463 # select will return when stdout is ready (including when it is 464 # EOF, that is the process has terminated). 465 ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT) 466 467 # os.read() has to be used instead of 468 # subproc.stdout.read() which will otherwise block 469 for fileno in ready: 470 bg_job,stdout = reverse_dict[fileno] 471 bg_job.process_output(stdout) 472 473 remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None] 474 if len(remaining_jobs) == 0: 475 return False 476 for bg_job in remaining_jobs: 477 bg_job.result.exit_status = bg_job.sp.poll() 478 479 if timeout: 480 time_left = stop_time - time.time() 481 482 # Kill all processes which did not complete prior to timeout 483 for bg_job in [x for x in bg_jobs if x.result.exit_status is None]: 484 print '* Warning: run process timeout (%s) fired' % timeout 485 nuke_subprocess(bg_job.sp) 486 bg_job.result.exit_status = bg_job.sp.poll() 487 488 return True 489 490 491def pid_is_alive(pid): 492 """ 493 True if process pid exists and is not yet stuck in Zombie state. 494 Zombies are impossible to move between cgroups, etc. 495 pid can be integer, or text of integer. 496 """ 497 path = '/proc/%s/stat' % pid 498 499 try: 500 stat = read_one_line(path) 501 except IOError: 502 if not os.path.exists(path): 503 # file went away 504 return False 505 raise 506 507 return stat.split()[2] != 'Z' 508 509 510def signal_pid(pid, sig): 511 """ 512 Sends a signal to a process id. Returns True if the process terminated 513 successfully, False otherwise. 514 """ 515 try: 516 os.kill(pid, sig) 517 except OSError: 518 # The process may have died before we could kill it. 519 pass 520 521 for i in range(5): 522 if not pid_is_alive(pid): 523 return True 524 time.sleep(1) 525 526 # The process is still alive 527 return False 528 529 530def nuke_subprocess(subproc): 531 # check if the subprocess is still alive, first 532 if subproc.poll() is not None: 533 return subproc.poll() 534 535 # the process has not terminated within timeout, 536 # kill it via an escalating series of signals. 537 signal_queue = [signal.SIGTERM, signal.SIGKILL] 538 for sig in signal_queue: 539 signal_pid(subproc.pid, sig) 540 if subproc.poll() is not None: 541 return subproc.poll() 542 543 544def nuke_pid(pid): 545 # the process has not terminated within timeout, 546 # kill it via an escalating series of signals. 547 signal_queue = [signal.SIGTERM, signal.SIGKILL] 548 for sig in signal_queue: 549 if signal_pid(pid, sig): 550 return 551 552 # no signal successfully terminated the process 553 raise error.AutoservRunError('Could not kill %d' % pid, None) 554 555 556def system(command, timeout=None, ignore_status=False): 557 """This function returns the exit status of command.""" 558 return run(command, timeout=timeout, ignore_status=ignore_status, 559 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).exit_status 560 561 562def system_parallel(commands, timeout=None, ignore_status=False): 563 """This function returns a list of exit statuses for the respective 564 list of commands.""" 565 return [bg_jobs.exit_status for bg_jobs in 566 run_parallel(commands, timeout=timeout, ignore_status=ignore_status, 567 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)] 568 569 570def system_output(command, timeout=None, ignore_status=False, 571 retain_output=False): 572 if retain_output: 573 out = run(command, timeout=timeout, ignore_status=ignore_status, 574 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).stdout 575 else: 576 out = run(command, timeout=timeout, ignore_status=ignore_status).stdout 577 if out[-1:] == '\n': out = out[:-1] 578 return out 579 580 581def system_output_parallel(commands, timeout=None, ignore_status=False, 582 retain_output=False): 583 if retain_output: 584 out = [bg_job.stdout for bg_job 585 in run_parallel(commands, timeout=timeout, 586 ignore_status=ignore_status, 587 stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)] 588 else: 589 out = [bg_job.stdout for bg_job in run_parallel(commands, 590 timeout=timeout, ignore_status=ignore_status)] 591 for x in out: 592 if out[-1:] == '\n': out = out[:-1] 593 return out 594 595 596def strip_unicode(input): 597 if type(input) == list: 598 return [strip_unicode(i) for i in input] 599 elif type(input) == dict: 600 output = {} 601 for key in input.keys(): 602 output[str(key)] = strip_unicode(input[key]) 603 return output 604 elif type(input) == unicode: 605 return str(input) 606 else: 607 return input 608 609 610def get_cpu_percentage(function, *args, **dargs): 611 """Returns a tuple containing the CPU% and return value from function call. 612 613 This function calculates the usage time by taking the difference of 614 the user and system times both before and after the function call. 615 """ 616 child_pre = resource.getrusage(resource.RUSAGE_CHILDREN) 617 self_pre = resource.getrusage(resource.RUSAGE_SELF) 618 start = time.time() 619 to_return = function(*args, **dargs) 620 elapsed = time.time() - start 621 self_post = resource.getrusage(resource.RUSAGE_SELF) 622 child_post = resource.getrusage(resource.RUSAGE_CHILDREN) 623 624 # Calculate CPU Percentage 625 s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]] 626 c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]] 627 cpu_percent = (s_user + c_user + s_system + c_system) / elapsed 628 629 return cpu_percent, to_return 630 631 632""" 633This function is used when there is a need to run more than one 634job simultaneously starting exactly at the same time. It basically returns 635a modified control file (containing the synchronization code prepended) 636whenever it is ready to run the control file. The synchronization 637is done using barriers to make sure that the jobs start at the same time. 638 639Here is how the synchronization is done to make sure that the tests 640start at exactly the same time on the client. 641sc_bar is a server barrier and s_bar, c_bar are the normal barriers 642 643 Job1 Job2 ...... JobN 644 Server: | sc_bar 645 Server: | s_bar ...... s_bar 646 Server: | at.run() at.run() ...... at.run() 647 ----------|------------------------------------------------------ 648 Client | sc_bar 649 Client | c_bar c_bar ...... c_bar 650 Client | <run test> <run test> ...... <run test> 651 652 653PARAMS: 654 control_file : The control file which to which the above synchronization 655 code would be prepended to 656 host_name : The host name on which the job is going to run 657 host_num (non negative) : A number to identify the machine so that we have 658 different sets of s_bar_ports for each of the machines. 659 instance : The number of the job 660 num_jobs : Total number of jobs that are going to run in parallel with 661 this job starting at the same time 662 port_base : Port number that is used to derive the actual barrier ports. 663 664RETURN VALUE: 665 The modified control file. 666 667""" 668def get_sync_control_file(control, host_name, host_num, 669 instance, num_jobs, port_base=63100): 670 sc_bar_port = port_base 671 c_bar_port = port_base 672 if host_num < 0: 673 print "Please provide a non negative number for the host" 674 return None 675 s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are 676 # the same for a given machine 677 678 sc_bar_timeout = 180 679 s_bar_timeout = c_bar_timeout = 120 680 681 # The barrier code snippet is prepended into the conrol file 682 # dynamically before at.run() is called finally. 683 control_new = [] 684 685 # jobid is the unique name used to identify the processes 686 # trying to reach the barriers 687 jobid = "%s#%d" % (host_name, instance) 688 689 rendv = [] 690 # rendvstr is a temp holder for the rendezvous list of the processes 691 for n in range(num_jobs): 692 rendv.append("'%s#%d'" % (host_name, n)) 693 rendvstr = ",".join(rendv) 694 695 if instance == 0: 696 # Do the setup and wait at the server barrier 697 # Clean up the tmp and the control dirs for the first instance 698 control_new.append('if os.path.exists(job.tmpdir):') 699 control_new.append("\t system('umount -f %s > /dev/null" 700 "2> /dev/null' % job.tmpdir," 701 "ignore_status=True)") 702 control_new.append("\t system('rm -rf ' + job.tmpdir)") 703 control_new.append( 704 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)' 705 % (jobid, sc_bar_timeout, sc_bar_port)) 706 control_new.append( 707 'b0.rendezvous_servers("PARALLEL_MASTER", "%s")' 708 % jobid) 709 710 elif instance == 1: 711 # Wait at the server barrier to wait for instance=0 712 # process to complete setup 713 b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout, 714 port=sc_bar_port) 715 b0.rendezvous_servers("PARALLEL_MASTER", jobid) 716 717 if(num_jobs > 2): 718 b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout, 719 port=s_bar_port) 720 b1.rendezvous(rendvstr) 721 722 else: 723 # For the rest of the clients 724 b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port) 725 b2.rendezvous(rendvstr) 726 727 # Client side barrier for all the tests to start at the same time 728 control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)' 729 % (jobid, c_bar_timeout, c_bar_port)) 730 control_new.append("b1.rendezvous(%s)" % rendvstr) 731 732 # Stick in the rest of the control file 733 control_new.append(control) 734 735 return "\n".join(control_new) 736 737 738def get_arch(run_function=run): 739 """ 740 Get the hardware architecture of the machine. 741 run_function is used to execute the commands. It defaults to 742 utils.run() but a custom method (if provided) should be of the 743 same schema as utils.run. It should return a CmdResult object and 744 throw a CmdError exception. 745 """ 746 arch = run_function('/bin/uname -m').stdout.rstrip() 747 if re.match(r'i\d86$', arch): 748 arch = 'i386' 749 return arch 750 751 752def get_num_logical_cpus_per_socket(run_function=run): 753 """ 754 Get the number of cores (including hyperthreading) per cpu. 755 run_function is used to execute the commands. It defaults to 756 utils.run() but a custom method (if provided) should be of the 757 same schema as utils.run. It should return a CmdResult object and 758 throw a CmdError exception. 759 """ 760 siblings = run_function('grep "^siblings" /proc/cpuinfo').stdout.rstrip() 761 num_siblings = map(int, 762 re.findall(r'^siblings\s*:\s*(\d+)\s*$', 763 siblings, re.M)) 764 if len(num_siblings) == 0: 765 raise error.TestError('Unable to find siblings info in /proc/cpuinfo') 766 if min(num_siblings) != max(num_siblings): 767 raise error.TestError('Number of siblings differ %r' % 768 num_siblings) 769 return num_siblings[0] 770 771 772def merge_trees(src, dest): 773 """ 774 Merges a source directory tree at 'src' into a destination tree at 775 'dest'. If a path is a file in both trees than the file in the source 776 tree is APPENDED to the one in the destination tree. If a path is 777 a directory in both trees then the directories are recursively merged 778 with this function. In any other case, the function will skip the 779 paths that cannot be merged (instead of failing). 780 """ 781 if not os.path.exists(src): 782 return # exists only in dest 783 elif not os.path.exists(dest): 784 if os.path.isfile(src): 785 shutil.copy2(src, dest) # file only in src 786 else: 787 shutil.copytree(src, dest, symlinks=True) # dir only in src 788 return 789 elif os.path.isfile(src) and os.path.isfile(dest): 790 # src & dest are files in both trees, append src to dest 791 destfile = open(dest, "a") 792 try: 793 srcfile = open(src) 794 try: 795 destfile.write(srcfile.read()) 796 finally: 797 srcfile.close() 798 finally: 799 destfile.close() 800 elif os.path.isdir(src) and os.path.isdir(dest): 801 # src & dest are directories in both trees, so recursively merge 802 for name in os.listdir(src): 803 merge_trees(os.path.join(src, name), os.path.join(dest, name)) 804 else: 805 # src & dest both exist, but are incompatible 806 return 807 808 809class CmdResult(object): 810 """ 811 Command execution result. 812 813 command: String containing the command line itself 814 exit_status: Integer exit code of the process 815 stdout: String containing stdout of the process 816 stderr: String containing stderr of the process 817 duration: Elapsed wall clock time running the process 818 """ 819 820 821 def __init__(self, command="", stdout="", stderr="", 822 exit_status=None, duration=0): 823 self.command = command 824 self.exit_status = exit_status 825 self.stdout = stdout 826 self.stderr = stderr 827 self.duration = duration 828 829 830 def __repr__(self): 831 wrapper = textwrap.TextWrapper(width = 78, 832 initial_indent="\n ", 833 subsequent_indent=" ") 834 835 stdout = self.stdout.rstrip() 836 if stdout: 837 stdout = "\nstdout:\n%s" % stdout 838 839 stderr = self.stderr.rstrip() 840 if stderr: 841 stderr = "\nstderr:\n%s" % stderr 842 843 return ("* Command: %s\n" 844 "Exit status: %s\n" 845 "Duration: %s\n" 846 "%s" 847 "%s" 848 % (wrapper.fill(self.command), self.exit_status, 849 self.duration, stdout, stderr)) 850 851 852class run_randomly: 853 def __init__(self, run_sequentially=False): 854 # Run sequentially is for debugging control files 855 self.test_list = [] 856 self.run_sequentially = run_sequentially 857 858 859 def add(self, *args, **dargs): 860 test = (args, dargs) 861 self.test_list.append(test) 862 863 864 def run(self, fn): 865 while self.test_list: 866 test_index = random.randint(0, len(self.test_list)-1) 867 if self.run_sequentially: 868 test_index = 0 869 (args, dargs) = self.test_list.pop(test_index) 870 fn(*args, **dargs) 871 872 873def import_site_symbol(path, module, name, dummy=None, modulefile=None): 874 """ 875 Try to import site specific symbol from site specific file if it exists 876 877 @param path full filename of the source file calling this (ie __file__) 878 @param module full module name 879 @param name symbol name to be imported from the site file 880 @param dummy dummy value to return in case there is no symbol to import 881 @param modulefile module filename 882 883 @return site specific symbol or dummy 884 885 @exception ImportError if the site file exists but imports fails 886 """ 887 short_module = module[module.rfind(".") + 1:] 888 889 if not modulefile: 890 modulefile = short_module + ".py" 891 892 try: 893 site_exists = os.path.getsize(os.path.join(os.path.dirname(path), 894 modulefile)) 895 except os.error: 896 site_exists = False 897 898 msg = None 899 if site_exists: 900 # special unique value to tell us if the symbol can't be imported 901 cant_import = object() 902 903 # return the object from the imported module 904 obj = getattr(__import__(module, {}, {}, [short_module]), name, 905 cant_import) 906 if obj is cant_import: 907 msg = ("unable to import site symbol '%s', using non-site " 908 "implementation") % name 909 logging.error(msg) 910 obj = dummy 911 else: 912 obj = dummy 913 914 return obj 915 916 917def import_site_class(path, module, classname, baseclass, modulefile=None): 918 """ 919 Try to import site specific class from site specific file if it exists 920 921 Args: 922 path: full filename of the source file calling this (ie __file__) 923 module: full module name 924 classname: class name to be loaded from site file 925 baseclass: base class object to return when no site file present or 926 to mixin when site class exists but is not inherited from baseclass 927 modulefile: module filename 928 929 Returns: baseclass if site specific class does not exist, the site specific 930 class if it exists and is inherited from baseclass or a mixin of the 931 site specific class and baseclass when the site specific class exists 932 and is not inherited from baseclass 933 934 Raises: ImportError if the site file exists but imports fails 935 """ 936 937 res = import_site_symbol(path, module, classname, None, modulefile) 938 if res: 939 if not issubclass(res, baseclass): 940 # if not a subclass of baseclass then mix in baseclass with the 941 # site specific class object and return the result 942 res = type(classname, (res, baseclass), {}) 943 else: 944 res = baseclass 945 946 return res 947 948 949def import_site_function(path, module, funcname, dummy, modulefile=None): 950 """ 951 Try to import site specific function from site specific file if it exists 952 953 Args: 954 path: full filename of the source file calling this (ie __file__) 955 module: full module name 956 funcname: function name to be imported from site file 957 dummy: dummy function to return in case there is no function to import 958 modulefile: module filename 959 960 Returns: site specific function object or dummy 961 962 Raises: ImportError if the site file exists but imports fails 963 """ 964 965 return import_site_symbol(path, module, funcname, dummy, modulefile) 966 967 968def _get_pid_path(program_name): 969 my_path = os.path.dirname(__file__) 970 return os.path.abspath(os.path.join(my_path, "..", "..", 971 "%s.pid" % program_name)) 972 973 974def write_pid(program_name): 975 """ 976 Try to drop <program_name>.pid in the main autotest directory. 977 978 Args: 979 program_name: prefix for file name 980 """ 981 pidfile = open(_get_pid_path(program_name), "w") 982 try: 983 pidfile.write("%s\n" % os.getpid()) 984 finally: 985 pidfile.close() 986 987 988def delete_pid_file_if_exists(program_name): 989 """ 990 Tries to remove <program_name>.pid from the main autotest directory. 991 """ 992 pidfile_path = _get_pid_path(program_name) 993 994 try: 995 os.remove(pidfile_path) 996 except OSError: 997 if not os.path.exists(pidfile_path): 998 return 999 raise 1000 1001 1002def get_pid_from_file(program_name): 1003 """ 1004 Reads the pid from <program_name>.pid in the autotest directory. 1005 1006 @param program_name the name of the program 1007 @return the pid if the file exists, None otherwise. 1008 """ 1009 pidfile_path = _get_pid_path(program_name) 1010 if not os.path.exists(pidfile_path): 1011 return None 1012 1013 pidfile = open(_get_pid_path(program_name), 'r') 1014 1015 try: 1016 try: 1017 pid = int(pidfile.readline()) 1018 except IOError: 1019 if not os.path.exists(pidfile_path): 1020 return None 1021 raise 1022 finally: 1023 pidfile.close() 1024 1025 return pid 1026 1027 1028def process_is_alive(program_name): 1029 """ 1030 Checks if the process is alive and not in Zombie state. 1031 1032 @param program_name the name of the program 1033 @return True if still alive, False otherwise 1034 """ 1035 pid = get_pid_from_file(program_name) 1036 if pid is None: 1037 return False 1038 return pid_is_alive(pid) 1039 1040 1041def signal_process(program_name, sig=signal.SIGTERM): 1042 """ 1043 Sends a signal to the process listed in <program_name>.pid 1044 1045 @param program_name the name of the program 1046 @param sig signal to send 1047 """ 1048 pid = get_pid_from_file(program_name) 1049 if pid: 1050 signal_pid(pid, sig) 1051 1052 1053def get_relative_path(path, reference): 1054 """Given 2 absolute paths "path" and "reference", compute the path of 1055 "path" as relative to the directory "reference". 1056 1057 @param path the absolute path to convert to a relative path 1058 @param reference an absolute directory path to which the relative 1059 path will be computed 1060 """ 1061 # normalize the paths (remove double slashes, etc) 1062 assert(os.path.isabs(path)) 1063 assert(os.path.isabs(reference)) 1064 1065 path = os.path.normpath(path) 1066 reference = os.path.normpath(reference) 1067 1068 # we could use os.path.split() but it splits from the end 1069 path_list = path.split(os.path.sep)[1:] 1070 ref_list = reference.split(os.path.sep)[1:] 1071 1072 # find the longest leading common path 1073 for i in xrange(min(len(path_list), len(ref_list))): 1074 if path_list[i] != ref_list[i]: 1075 # decrement i so when exiting this loop either by no match or by 1076 # end of range we are one step behind 1077 i -= 1 1078 break 1079 i += 1 1080 # drop the common part of the paths, not interested in that anymore 1081 del path_list[:i] 1082 1083 # for each uncommon component in the reference prepend a ".." 1084 path_list[:0] = ['..'] * (len(ref_list) - i) 1085 1086 return os.path.join(*path_list) 1087 1088 1089def sh_escape(command): 1090 """ 1091 Escape special characters from a command so that it can be passed 1092 as a double quoted (" ") string in a (ba)sh command. 1093 1094 Args: 1095 command: the command string to escape. 1096 1097 Returns: 1098 The escaped command string. The required englobing double 1099 quotes are NOT added and so should be added at some point by 1100 the caller. 1101 1102 See also: http://www.tldp.org/LDP/abs/html/escapingsection.html 1103 """ 1104 command = command.replace("\\", "\\\\") 1105 command = command.replace("$", r'\$') 1106 command = command.replace('"', r'\"') 1107 command = command.replace('`', r'\`') 1108 return command 1109