1""" 2TestCmd.py: a testing framework for commands and scripts. 3 4The TestCmd module provides a framework for portable automated testing 5of executable commands and scripts (in any language, not just Python), 6especially commands and scripts that require file system interaction. 7 8In addition to running tests and evaluating conditions, the TestCmd 9module manages and cleans up one or more temporary workspace 10directories, and provides methods for creating files and directories in 11those workspace directories from in-line data, here-documents), allowing 12tests to be completely self-contained. 13 14A TestCmd environment object is created via the usual invocation: 15 16 import TestCmd 17 test = TestCmd.TestCmd() 18 19There are a bunch of keyword arguments available at instantiation: 20 21 test = TestCmd.TestCmd(description = 'string', 22 program = 'program_or_script_to_test', 23 interpreter = 'script_interpreter', 24 workdir = 'prefix', 25 subdir = 'subdir', 26 verbose = Boolean, 27 match = default_match_function, 28 diff = default_diff_function, 29 combine = Boolean) 30 31There are a bunch of methods that let you do different things: 32 33 test.verbose_set(1) 34 35 test.description_set('string') 36 37 test.program_set('program_or_script_to_test') 38 39 test.interpreter_set('script_interpreter') 40 test.interpreter_set(['script_interpreter', 'arg']) 41 42 test.workdir_set('prefix') 43 test.workdir_set('') 44 45 test.workpath('file') 46 test.workpath('subdir', 'file') 47 48 test.subdir('subdir', ...) 49 50 test.rmdir('subdir', ...) 51 52 test.write('file', "contents\n") 53 test.write(['subdir', 'file'], "contents\n") 54 55 test.read('file') 56 test.read(['subdir', 'file']) 57 test.read('file', mode) 58 test.read(['subdir', 'file'], mode) 59 60 test.writable('dir', 1) 61 test.writable('dir', None) 62 63 test.preserve(condition, ...) 64 65 test.cleanup(condition) 66 67 test.command_args(program = 'program_or_script_to_run', 68 interpreter = 'script_interpreter', 69 arguments = 'arguments to pass to program') 70 71 test.run(program = 'program_or_script_to_run', 72 interpreter = 'script_interpreter', 73 arguments = 'arguments to pass to program', 74 chdir = 'directory_to_chdir_to', 75 stdin = 'input to feed to the program\n') 76 universal_newlines = True) 77 78 p = test.start(program = 'program_or_script_to_run', 79 interpreter = 'script_interpreter', 80 arguments = 'arguments to pass to program', 81 universal_newlines = None) 82 83 test.finish(self, p) 84 85 test.pass_test() 86 test.pass_test(condition) 87 test.pass_test(condition, function) 88 89 test.fail_test() 90 test.fail_test(condition) 91 test.fail_test(condition, function) 92 test.fail_test(condition, function, skip) 93 94 test.no_result() 95 test.no_result(condition) 96 test.no_result(condition, function) 97 test.no_result(condition, function, skip) 98 99 test.stdout() 100 test.stdout(run) 101 102 test.stderr() 103 test.stderr(run) 104 105 test.symlink(target, link) 106 107 test.banner(string) 108 test.banner(string, width) 109 110 test.diff(actual, expected) 111 112 test.match(actual, expected) 113 114 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n") 115 test.match_exact(["actual 1\n", "actual 2\n"], 116 ["expected 1\n", "expected 2\n"]) 117 118 test.match_re("actual 1\nactual 2\n", regex_string) 119 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes) 120 121 test.match_re_dotall("actual 1\nactual 2\n", regex_string) 122 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes) 123 124 test.tempdir() 125 test.tempdir('temporary-directory') 126 127 test.sleep() 128 test.sleep(seconds) 129 130 test.where_is('foo') 131 test.where_is('foo', 'PATH1:PATH2') 132 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 133 134 test.unlink('file') 135 test.unlink('subdir', 'file') 136 137The TestCmd module provides pass_test(), fail_test(), and no_result() 138unbound functions that report test results for use with the Aegis change 139management system. These methods terminate the test immediately, 140reporting PASSED, FAILED, or NO RESULT respectively, and exiting with 141status 0 (success), 1 or 2 respectively. This allows for a distinction 142between an actual failed test and a test that could not be properly 143evaluated because of an external condition (such as a full file system 144or incorrect permissions). 145 146 import TestCmd 147 148 TestCmd.pass_test() 149 TestCmd.pass_test(condition) 150 TestCmd.pass_test(condition, function) 151 152 TestCmd.fail_test() 153 TestCmd.fail_test(condition) 154 TestCmd.fail_test(condition, function) 155 TestCmd.fail_test(condition, function, skip) 156 157 TestCmd.no_result() 158 TestCmd.no_result(condition) 159 TestCmd.no_result(condition, function) 160 TestCmd.no_result(condition, function, skip) 161 162The TestCmd module also provides unbound functions that handle matching 163in the same way as the match_*() methods described above. 164 165 import TestCmd 166 167 test = TestCmd.TestCmd(match = TestCmd.match_exact) 168 169 test = TestCmd.TestCmd(match = TestCmd.match_re) 170 171 test = TestCmd.TestCmd(match = TestCmd.match_re_dotall) 172 173The TestCmd module provides unbound functions that can be used for the 174"diff" argument to TestCmd.TestCmd instantiation: 175 176 import TestCmd 177 178 test = TestCmd.TestCmd(match = TestCmd.match_re, 179 diff = TestCmd.diff_re) 180 181 test = TestCmd.TestCmd(diff = TestCmd.simple_diff) 182 183The "diff" argument can also be used with standard difflib functions: 184 185 import difflib 186 187 test = TestCmd.TestCmd(diff = difflib.context_diff) 188 189 test = TestCmd.TestCmd(diff = difflib.unified_diff) 190 191Lastly, the where_is() method also exists in an unbound function 192version. 193 194 import TestCmd 195 196 TestCmd.where_is('foo') 197 TestCmd.where_is('foo', 'PATH1:PATH2') 198 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4') 199""" 200 201# Copyright 2000-2010 Steven Knight 202# This module is free software, and you may redistribute it and/or modify 203# it under the same terms as Python itself, so long as this copyright message 204# and disclaimer are retained in their original form. 205# 206# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, 207# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF 208# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH 209# DAMAGE. 210# 211# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 212# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 213# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, 214# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, 215# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 216 217__author__ = "Steven Knight <knight at baldmt dot com>" 218__revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight" 219__version__ = "0.37" 220 221import errno 222import os 223import os.path 224import re 225import shutil 226import stat 227import string 228import sys 229import tempfile 230import time 231import traceback 232import types 233import UserList 234 235__all__ = [ 236 'diff_re', 237 'fail_test', 238 'no_result', 239 'pass_test', 240 'match_exact', 241 'match_re', 242 'match_re_dotall', 243 'python_executable', 244 'TestCmd' 245] 246 247try: 248 import difflib 249except ImportError: 250 __all__.append('simple_diff') 251 252def is_List(e): 253 return type(e) is types.ListType \ 254 or isinstance(e, UserList.UserList) 255 256try: 257 from UserString import UserString 258except ImportError: 259 class UserString: 260 pass 261 262if hasattr(types, 'UnicodeType'): 263 def is_String(e): 264 return type(e) is types.StringType \ 265 or type(e) is types.UnicodeType \ 266 or isinstance(e, UserString) 267else: 268 def is_String(e): 269 return type(e) is types.StringType or isinstance(e, UserString) 270 271tempfile.template = 'testcmd.' 272if os.name in ('posix', 'nt'): 273 tempfile.template = 'testcmd.' + str(os.getpid()) + '.' 274else: 275 tempfile.template = 'testcmd.' 276 277re_space = re.compile('\s') 278 279_Cleanup = [] 280 281_chain_to_exitfunc = None 282 283def _clean(): 284 global _Cleanup 285 cleanlist = filter(None, _Cleanup) 286 del _Cleanup[:] 287 cleanlist.reverse() 288 for test in cleanlist: 289 test.cleanup() 290 if _chain_to_exitfunc: 291 _chain_to_exitfunc() 292 293try: 294 import atexit 295except ImportError: 296 # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc 297 try: 298 _chain_to_exitfunc = sys.exitfunc 299 except AttributeError: 300 pass 301 sys.exitfunc = _clean 302else: 303 atexit.register(_clean) 304 305try: 306 zip 307except NameError: 308 def zip(*lists): 309 result = [] 310 for i in xrange(min(map(len, lists))): 311 result.append(tuple(map(lambda l, i=i: l[i], lists))) 312 return result 313 314class Collector: 315 def __init__(self, top): 316 self.entries = [top] 317 def __call__(self, arg, dirname, names): 318 pathjoin = lambda n, d=dirname: os.path.join(d, n) 319 self.entries.extend(map(pathjoin, names)) 320 321def _caller(tblist, skip): 322 string = "" 323 arr = [] 324 for file, line, name, text in tblist: 325 if file[-10:] == "TestCmd.py": 326 break 327 arr = [(file, line, name, text)] + arr 328 atfrom = "at" 329 for file, line, name, text in arr[skip:]: 330 if name in ("?", "<module>"): 331 name = "" 332 else: 333 name = " (" + name + ")" 334 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) 335 atfrom = "\tfrom" 336 return string 337 338def fail_test(self = None, condition = 1, function = None, skip = 0): 339 """Cause the test to fail. 340 341 By default, the fail_test() method reports that the test FAILED 342 and exits with a status of 1. If a condition argument is supplied, 343 the test fails only if the condition is true. 344 """ 345 if not condition: 346 return 347 if not function is None: 348 function() 349 of = "" 350 desc = "" 351 sep = " " 352 if not self is None: 353 if self.program: 354 of = " of " + self.program 355 sep = "\n\t" 356 if self.description: 357 desc = " [" + self.description + "]" 358 sep = "\n\t" 359 360 at = _caller(traceback.extract_stack(), skip) 361 sys.stderr.write("FAILED test" + of + desc + sep + at) 362 363 sys.exit(1) 364 365def no_result(self = None, condition = 1, function = None, skip = 0): 366 """Causes a test to exit with no valid result. 367 368 By default, the no_result() method reports NO RESULT for the test 369 and exits with a status of 2. If a condition argument is supplied, 370 the test fails only if the condition is true. 371 """ 372 if not condition: 373 return 374 if not function is None: 375 function() 376 of = "" 377 desc = "" 378 sep = " " 379 if not self is None: 380 if self.program: 381 of = " of " + self.program 382 sep = "\n\t" 383 if self.description: 384 desc = " [" + self.description + "]" 385 sep = "\n\t" 386 387 if os.environ.get('TESTCMD_DEBUG_SKIPS'): 388 at = _caller(traceback.extract_stack(), skip) 389 sys.stderr.write("NO RESULT for test" + of + desc + sep + at) 390 else: 391 sys.stderr.write("NO RESULT\n") 392 393 sys.exit(2) 394 395def pass_test(self = None, condition = 1, function = None): 396 """Causes a test to pass. 397 398 By default, the pass_test() method reports PASSED for the test 399 and exits with a status of 0. If a condition argument is supplied, 400 the test passes only if the condition is true. 401 """ 402 if not condition: 403 return 404 if not function is None: 405 function() 406 sys.stderr.write("PASSED\n") 407 sys.exit(0) 408 409def match_exact(lines = None, matches = None): 410 """ 411 """ 412 if not is_List(lines): 413 lines = string.split(lines, "\n") 414 if not is_List(matches): 415 matches = string.split(matches, "\n") 416 if len(lines) != len(matches): 417 return 418 for i in range(len(lines)): 419 if lines[i] != matches[i]: 420 return 421 return 1 422 423def match_re(lines = None, res = None): 424 """ 425 """ 426 if not is_List(lines): 427 lines = string.split(lines, "\n") 428 if not is_List(res): 429 res = string.split(res, "\n") 430 if len(lines) != len(res): 431 return 432 for i in range(len(lines)): 433 s = "^" + res[i] + "$" 434 try: 435 expr = re.compile(s) 436 except re.error, e: 437 msg = "Regular expression error in %s: %s" 438 raise re.error, msg % (repr(s), e[0]) 439 if not expr.search(lines[i]): 440 return 441 return 1 442 443def match_re_dotall(lines = None, res = None): 444 """ 445 """ 446 if not type(lines) is type(""): 447 lines = string.join(lines, "\n") 448 if not type(res) is type(""): 449 res = string.join(res, "\n") 450 s = "^" + res + "$" 451 try: 452 expr = re.compile(s, re.DOTALL) 453 except re.error, e: 454 msg = "Regular expression error in %s: %s" 455 raise re.error, msg % (repr(s), e[0]) 456 if expr.match(lines): 457 return 1 458 459try: 460 import difflib 461except ImportError: 462 pass 463else: 464 def simple_diff(a, b, fromfile='', tofile='', 465 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 466 """ 467 A function with the same calling signature as difflib.context_diff 468 (diff -c) and difflib.unified_diff (diff -u) but which prints 469 output like the simple, unadorned 'diff" command. 470 """ 471 sm = difflib.SequenceMatcher(None, a, b) 472 def comma(x1, x2): 473 return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2) 474 result = [] 475 for op, a1, a2, b1, b2 in sm.get_opcodes(): 476 if op == 'delete': 477 result.append("%sd%d" % (comma(a1, a2), b1)) 478 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 479 elif op == 'insert': 480 result.append("%da%s" % (a1, comma(b1, b2))) 481 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 482 elif op == 'replace': 483 result.append("%sc%s" % (comma(a1, a2), comma(b1, b2))) 484 result.extend(map(lambda l: '< ' + l, a[a1:a2])) 485 result.append('---') 486 result.extend(map(lambda l: '> ' + l, b[b1:b2])) 487 return result 488 489def diff_re(a, b, fromfile='', tofile='', 490 fromfiledate='', tofiledate='', n=3, lineterm='\n'): 491 """ 492 A simple "diff" of two sets of lines when the expected lines 493 are regular expressions. This is a really dumb thing that 494 just compares each line in turn, so it doesn't look for 495 chunks of matching lines and the like--but at least it lets 496 you know exactly which line first didn't compare correctl... 497 """ 498 result = [] 499 diff = len(a) - len(b) 500 if diff < 0: 501 a = a + ['']*(-diff) 502 elif diff > 0: 503 b = b + ['']*diff 504 i = 0 505 for aline, bline in zip(a, b): 506 s = "^" + aline + "$" 507 try: 508 expr = re.compile(s) 509 except re.error, e: 510 msg = "Regular expression error in %s: %s" 511 raise re.error, msg % (repr(s), e[0]) 512 if not expr.search(bline): 513 result.append("%sc%s" % (i+1, i+1)) 514 result.append('< ' + repr(a[i])) 515 result.append('---') 516 result.append('> ' + repr(b[i])) 517 i = i+1 518 return result 519 520if os.name == 'java': 521 522 python_executable = os.path.join(sys.prefix, 'jython') 523 524else: 525 526 python_executable = sys.executable 527 528if sys.platform == 'win32': 529 530 default_sleep_seconds = 2 531 532 def where_is(file, path=None, pathext=None): 533 if path is None: 534 path = os.environ['PATH'] 535 if is_String(path): 536 path = string.split(path, os.pathsep) 537 if pathext is None: 538 pathext = os.environ['PATHEXT'] 539 if is_String(pathext): 540 pathext = string.split(pathext, os.pathsep) 541 for ext in pathext: 542 if string.lower(ext) == string.lower(file[-len(ext):]): 543 pathext = [''] 544 break 545 for dir in path: 546 f = os.path.join(dir, file) 547 for ext in pathext: 548 fext = f + ext 549 if os.path.isfile(fext): 550 return fext 551 return None 552 553else: 554 555 def where_is(file, path=None, pathext=None): 556 if path is None: 557 path = os.environ['PATH'] 558 if is_String(path): 559 path = string.split(path, os.pathsep) 560 for dir in path: 561 f = os.path.join(dir, file) 562 if os.path.isfile(f): 563 try: 564 st = os.stat(f) 565 except OSError: 566 continue 567 if stat.S_IMODE(st[stat.ST_MODE]) & 0111: 568 return f 569 return None 570 571 default_sleep_seconds = 1 572 573 574 575try: 576 import subprocess 577except ImportError: 578 # The subprocess module doesn't exist in this version of Python, 579 # so we're going to cobble up something that looks just enough 580 # like its API for our purposes below. 581 import new 582 583 subprocess = new.module('subprocess') 584 585 subprocess.PIPE = 'PIPE' 586 subprocess.STDOUT = 'STDOUT' 587 subprocess.mswindows = (sys.platform == 'win32') 588 589 try: 590 import popen2 591 popen2.Popen3 592 except AttributeError: 593 class Popen3: 594 universal_newlines = 1 595 def __init__(self, command, **kw): 596 if sys.platform == 'win32' and command[0] == '"': 597 command = '"' + command + '"' 598 (stdin, stdout, stderr) = os.popen3(' ' + command) 599 self.stdin = stdin 600 self.stdout = stdout 601 self.stderr = stderr 602 def close_output(self): 603 self.stdout.close() 604 self.resultcode = self.stderr.close() 605 def wait(self): 606 resultcode = self.resultcode 607 if os.WIFEXITED(resultcode): 608 return os.WEXITSTATUS(resultcode) 609 elif os.WIFSIGNALED(resultcode): 610 return os.WTERMSIG(resultcode) 611 else: 612 return None 613 614 else: 615 try: 616 popen2.Popen4 617 except AttributeError: 618 # A cribbed Popen4 class, with some retrofitted code from 619 # the Python 1.5 Popen3 class methods to do certain things 620 # by hand. 621 class Popen4(popen2.Popen3): 622 childerr = None 623 624 def __init__(self, cmd, bufsize=-1): 625 p2cread, p2cwrite = os.pipe() 626 c2pread, c2pwrite = os.pipe() 627 self.pid = os.fork() 628 if self.pid == 0: 629 # Child 630 os.dup2(p2cread, 0) 631 os.dup2(c2pwrite, 1) 632 os.dup2(c2pwrite, 2) 633 for i in range(3, popen2.MAXFD): 634 try: 635 os.close(i) 636 except: pass 637 try: 638 os.execvp(cmd[0], cmd) 639 finally: 640 os._exit(1) 641 # Shouldn't come here, I guess 642 os._exit(1) 643 os.close(p2cread) 644 self.tochild = os.fdopen(p2cwrite, 'w', bufsize) 645 os.close(c2pwrite) 646 self.fromchild = os.fdopen(c2pread, 'r', bufsize) 647 popen2._active.append(self) 648 649 popen2.Popen4 = Popen4 650 651 class Popen3(popen2.Popen3, popen2.Popen4): 652 universal_newlines = 1 653 def __init__(self, command, **kw): 654 if kw.get('stderr') == 'STDOUT': 655 apply(popen2.Popen4.__init__, (self, command, 1)) 656 else: 657 apply(popen2.Popen3.__init__, (self, command, 1)) 658 self.stdin = self.tochild 659 self.stdout = self.fromchild 660 self.stderr = self.childerr 661 def wait(self, *args, **kw): 662 resultcode = apply(popen2.Popen3.wait, (self,)+args, kw) 663 if os.WIFEXITED(resultcode): 664 return os.WEXITSTATUS(resultcode) 665 elif os.WIFSIGNALED(resultcode): 666 return os.WTERMSIG(resultcode) 667 else: 668 return None 669 670 subprocess.Popen = Popen3 671 672 673 674# From Josiah Carlson, 675# ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms 676# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 677 678PIPE = subprocess.PIPE 679 680if subprocess.mswindows: 681 from win32file import ReadFile, WriteFile 682 from win32pipe import PeekNamedPipe 683 import msvcrt 684else: 685 import select 686 import fcntl 687 688 try: fcntl.F_GETFL 689 except AttributeError: fcntl.F_GETFL = 3 690 691 try: fcntl.F_SETFL 692 except AttributeError: fcntl.F_SETFL = 4 693 694class Popen(subprocess.Popen): 695 def recv(self, maxsize=None): 696 return self._recv('stdout', maxsize) 697 698 def recv_err(self, maxsize=None): 699 return self._recv('stderr', maxsize) 700 701 def send_recv(self, input='', maxsize=None): 702 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) 703 704 def get_conn_maxsize(self, which, maxsize): 705 if maxsize is None: 706 maxsize = 1024 707 elif maxsize < 1: 708 maxsize = 1 709 return getattr(self, which), maxsize 710 711 def _close(self, which): 712 getattr(self, which).close() 713 setattr(self, which, None) 714 715 if subprocess.mswindows: 716 def send(self, input): 717 if not self.stdin: 718 return None 719 720 try: 721 x = msvcrt.get_osfhandle(self.stdin.fileno()) 722 (errCode, written) = WriteFile(x, input) 723 except ValueError: 724 return self._close('stdin') 725 except (subprocess.pywintypes.error, Exception), why: 726 if why[0] in (109, errno.ESHUTDOWN): 727 return self._close('stdin') 728 raise 729 730 return written 731 732 def _recv(self, which, maxsize): 733 conn, maxsize = self.get_conn_maxsize(which, maxsize) 734 if conn is None: 735 return None 736 737 try: 738 x = msvcrt.get_osfhandle(conn.fileno()) 739 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) 740 if maxsize < nAvail: 741 nAvail = maxsize 742 if nAvail > 0: 743 (errCode, read) = ReadFile(x, nAvail, None) 744 except ValueError: 745 return self._close(which) 746 except (subprocess.pywintypes.error, Exception), why: 747 if why[0] in (109, errno.ESHUTDOWN): 748 return self._close(which) 749 raise 750 751 #if self.universal_newlines: 752 # read = self._translate_newlines(read) 753 return read 754 755 else: 756 def send(self, input): 757 if not self.stdin: 758 return None 759 760 if not select.select([], [self.stdin], [], 0)[1]: 761 return 0 762 763 try: 764 written = os.write(self.stdin.fileno(), input) 765 except OSError, why: 766 if why[0] == errno.EPIPE: #broken pipe 767 return self._close('stdin') 768 raise 769 770 return written 771 772 def _recv(self, which, maxsize): 773 conn, maxsize = self.get_conn_maxsize(which, maxsize) 774 if conn is None: 775 return None 776 777 try: 778 flags = fcntl.fcntl(conn, fcntl.F_GETFL) 779 except TypeError: 780 flags = None 781 else: 782 if not conn.closed: 783 fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) 784 785 try: 786 if not select.select([conn], [], [], 0)[0]: 787 return '' 788 789 r = conn.read(maxsize) 790 if not r: 791 return self._close(which) 792 793 #if self.universal_newlines: 794 # r = self._translate_newlines(r) 795 return r 796 finally: 797 if not conn.closed and not flags is None: 798 fcntl.fcntl(conn, fcntl.F_SETFL, flags) 799 800disconnect_message = "Other end disconnected!" 801 802def recv_some(p, t=.1, e=1, tr=5, stderr=0): 803 if tr < 1: 804 tr = 1 805 x = time.time()+t 806 y = [] 807 r = '' 808 pr = p.recv 809 if stderr: 810 pr = p.recv_err 811 while time.time() < x or r: 812 r = pr() 813 if r is None: 814 if e: 815 raise Exception(disconnect_message) 816 else: 817 break 818 elif r: 819 y.append(r) 820 else: 821 time.sleep(max((x-time.time())/tr, 0)) 822 return ''.join(y) 823 824# TODO(3.0: rewrite to use memoryview() 825def send_all(p, data): 826 while len(data): 827 sent = p.send(data) 828 if sent is None: 829 raise Exception(disconnect_message) 830 data = buffer(data, sent) 831 832 833 834try: 835 object 836except NameError: 837 class object: 838 pass 839 840 841 842class TestCmd(object): 843 """Class TestCmd 844 """ 845 846 def __init__(self, description = None, 847 program = None, 848 interpreter = None, 849 workdir = None, 850 subdir = None, 851 verbose = None, 852 match = None, 853 diff = None, 854 combine = 0, 855 universal_newlines = 1): 856 self._cwd = os.getcwd() 857 self.description_set(description) 858 self.program_set(program) 859 self.interpreter_set(interpreter) 860 if verbose is None: 861 try: 862 verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) ) 863 except ValueError: 864 verbose = 0 865 self.verbose_set(verbose) 866 self.combine = combine 867 self.universal_newlines = universal_newlines 868 if match is not None: 869 self.match_function = match 870 else: 871 self.match_function = match_re 872 if diff is not None: 873 self.diff_function = diff 874 else: 875 try: 876 difflib 877 except NameError: 878 pass 879 else: 880 self.diff_function = simple_diff 881 #self.diff_function = difflib.context_diff 882 #self.diff_function = difflib.unified_diff 883 self._dirlist = [] 884 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} 885 if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '': 886 self._preserve['pass_test'] = os.environ['PRESERVE'] 887 self._preserve['fail_test'] = os.environ['PRESERVE'] 888 self._preserve['no_result'] = os.environ['PRESERVE'] 889 else: 890 try: 891 self._preserve['pass_test'] = os.environ['PRESERVE_PASS'] 892 except KeyError: 893 pass 894 try: 895 self._preserve['fail_test'] = os.environ['PRESERVE_FAIL'] 896 except KeyError: 897 pass 898 try: 899 self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT'] 900 except KeyError: 901 pass 902 self._stdout = [] 903 self._stderr = [] 904 self.status = None 905 self.condition = 'no_result' 906 self.workdir_set(workdir) 907 self.subdir(subdir) 908 909 def __del__(self): 910 self.cleanup() 911 912 def __repr__(self): 913 return "%x" % id(self) 914 915 banner_char = '=' 916 banner_width = 80 917 918 def banner(self, s, width=None): 919 if width is None: 920 width = self.banner_width 921 return s + self.banner_char * (width - len(s)) 922 923 if os.name == 'posix': 924 925 def escape(self, arg): 926 "escape shell special characters" 927 slash = '\\' 928 special = '"$' 929 930 arg = string.replace(arg, slash, slash+slash) 931 for c in special: 932 arg = string.replace(arg, c, slash+c) 933 934 if re_space.search(arg): 935 arg = '"' + arg + '"' 936 return arg 937 938 else: 939 940 # Windows does not allow special characters in file names 941 # anyway, so no need for an escape function, we will just quote 942 # the arg. 943 def escape(self, arg): 944 if re_space.search(arg): 945 arg = '"' + arg + '"' 946 return arg 947 948 def canonicalize(self, path): 949 if is_List(path): 950 path = apply(os.path.join, tuple(path)) 951 if not os.path.isabs(path): 952 path = os.path.join(self.workdir, path) 953 return path 954 955 def chmod(self, path, mode): 956 """Changes permissions on the specified file or directory 957 path name.""" 958 path = self.canonicalize(path) 959 os.chmod(path, mode) 960 961 def cleanup(self, condition = None): 962 """Removes any temporary working directories for the specified 963 TestCmd environment. If the environment variable PRESERVE was 964 set when the TestCmd environment was created, temporary working 965 directories are not removed. If any of the environment variables 966 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set 967 when the TestCmd environment was created, then temporary working 968 directories are not removed if the test passed, failed, or had 969 no result, respectively. Temporary working directories are also 970 preserved for conditions specified via the preserve method. 971 972 Typically, this method is not called directly, but is used when 973 the script exits to clean up temporary working directories as 974 appropriate for the exit status. 975 """ 976 if not self._dirlist: 977 return 978 os.chdir(self._cwd) 979 self.workdir = None 980 if condition is None: 981 condition = self.condition 982 if self._preserve[condition]: 983 for dir in self._dirlist: 984 print "Preserved directory", dir 985 else: 986 list = self._dirlist[:] 987 list.reverse() 988 for dir in list: 989 self.writable(dir, 1) 990 shutil.rmtree(dir, ignore_errors = 1) 991 self._dirlist = [] 992 993 try: 994 global _Cleanup 995 _Cleanup.remove(self) 996 except (AttributeError, ValueError): 997 pass 998 999 def command_args(self, program = None, 1000 interpreter = None, 1001 arguments = None): 1002 if program: 1003 if type(program) == type('') and not os.path.isabs(program): 1004 program = os.path.join(self._cwd, program) 1005 else: 1006 program = self.program 1007 if not interpreter: 1008 interpreter = self.interpreter 1009 if not type(program) in [type([]), type(())]: 1010 program = [program] 1011 cmd = list(program) 1012 if interpreter: 1013 if not type(interpreter) in [type([]), type(())]: 1014 interpreter = [interpreter] 1015 cmd = list(interpreter) + cmd 1016 if arguments: 1017 if type(arguments) == type(''): 1018 arguments = string.split(arguments) 1019 cmd.extend(arguments) 1020 return cmd 1021 1022 def description_set(self, description): 1023 """Set the description of the functionality being tested. 1024 """ 1025 self.description = description 1026 1027 try: 1028 difflib 1029 except NameError: 1030 def diff(self, a, b, name, *args, **kw): 1031 print self.banner('Expected %s' % name) 1032 print a 1033 print self.banner('Actual %s' % name) 1034 print b 1035 else: 1036 def diff(self, a, b, name, *args, **kw): 1037 print self.banner(name) 1038 args = (a.splitlines(), b.splitlines()) + args 1039 lines = apply(self.diff_function, args, kw) 1040 for l in lines: 1041 print l 1042 1043 def fail_test(self, condition = 1, function = None, skip = 0): 1044 """Cause the test to fail. 1045 """ 1046 if not condition: 1047 return 1048 self.condition = 'fail_test' 1049 fail_test(self = self, 1050 condition = condition, 1051 function = function, 1052 skip = skip) 1053 1054 def interpreter_set(self, interpreter): 1055 """Set the program to be used to interpret the program 1056 under test as a script. 1057 """ 1058 self.interpreter = interpreter 1059 1060 def match(self, lines, matches): 1061 """Compare actual and expected file contents. 1062 """ 1063 return self.match_function(lines, matches) 1064 1065 def match_exact(self, lines, matches): 1066 """Compare actual and expected file contents. 1067 """ 1068 return match_exact(lines, matches) 1069 1070 def match_re(self, lines, res): 1071 """Compare actual and expected file contents. 1072 """ 1073 return match_re(lines, res) 1074 1075 def match_re_dotall(self, lines, res): 1076 """Compare actual and expected file contents. 1077 """ 1078 return match_re_dotall(lines, res) 1079 1080 def no_result(self, condition = 1, function = None, skip = 0): 1081 """Report that the test could not be run. 1082 """ 1083 if not condition: 1084 return 1085 self.condition = 'no_result' 1086 no_result(self = self, 1087 condition = condition, 1088 function = function, 1089 skip = skip) 1090 1091 def pass_test(self, condition = 1, function = None): 1092 """Cause the test to pass. 1093 """ 1094 if not condition: 1095 return 1096 self.condition = 'pass_test' 1097 pass_test(self = self, condition = condition, function = function) 1098 1099 def preserve(self, *conditions): 1100 """Arrange for the temporary working directories for the 1101 specified TestCmd environment to be preserved for one or more 1102 conditions. If no conditions are specified, arranges for 1103 the temporary working directories to be preserved for all 1104 conditions. 1105 """ 1106 if conditions is (): 1107 conditions = ('pass_test', 'fail_test', 'no_result') 1108 for cond in conditions: 1109 self._preserve[cond] = 1 1110 1111 def program_set(self, program): 1112 """Set the executable program or script to be tested. 1113 """ 1114 if program and not os.path.isabs(program): 1115 program = os.path.join(self._cwd, program) 1116 self.program = program 1117 1118 def read(self, file, mode = 'rb'): 1119 """Reads and returns the contents of the specified file name. 1120 The file name may be a list, in which case the elements are 1121 concatenated with the os.path.join() method. The file is 1122 assumed to be under the temporary working directory unless it 1123 is an absolute path name. The I/O mode for the file may 1124 be specified; it must begin with an 'r'. The default is 1125 'rb' (binary read). 1126 """ 1127 file = self.canonicalize(file) 1128 if mode[0] != 'r': 1129 raise ValueError, "mode must begin with 'r'" 1130 with open(file, mode) as f: 1131 result = f.read() 1132 return result 1133 1134 def rmdir(self, dir): 1135 """Removes the specified dir name. 1136 The dir name may be a list, in which case the elements are 1137 concatenated with the os.path.join() method. The dir is 1138 assumed to be under the temporary working directory unless it 1139 is an absolute path name. 1140 The dir must be empty. 1141 """ 1142 dir = self.canonicalize(dir) 1143 os.rmdir(dir) 1144 1145 def start(self, program = None, 1146 interpreter = None, 1147 arguments = None, 1148 universal_newlines = None, 1149 **kw): 1150 """ 1151 Starts a program or script for the test environment. 1152 1153 The specified program will have the original directory 1154 prepended unless it is enclosed in a [list]. 1155 """ 1156 cmd = self.command_args(program, interpreter, arguments) 1157 cmd_string = string.join(map(self.escape, cmd), ' ') 1158 if self.verbose: 1159 sys.stderr.write(cmd_string + "\n") 1160 if universal_newlines is None: 1161 universal_newlines = self.universal_newlines 1162 1163 # On Windows, if we make stdin a pipe when we plan to send 1164 # no input, and the test program exits before 1165 # Popen calls msvcrt.open_osfhandle, that call will fail. 1166 # So don't use a pipe for stdin if we don't need one. 1167 stdin = kw.get('stdin', None) 1168 if stdin is not None: 1169 stdin = subprocess.PIPE 1170 1171 combine = kw.get('combine', self.combine) 1172 if combine: 1173 stderr_value = subprocess.STDOUT 1174 else: 1175 stderr_value = subprocess.PIPE 1176 1177 return Popen(cmd, 1178 stdin=stdin, 1179 stdout=subprocess.PIPE, 1180 stderr=stderr_value, 1181 universal_newlines=universal_newlines) 1182 1183 def finish(self, popen, **kw): 1184 """ 1185 Finishes and waits for the process being run under control of 1186 the specified popen argument, recording the exit status, 1187 standard output and error output. 1188 """ 1189 popen.stdin.close() 1190 self.status = popen.wait() 1191 if not self.status: 1192 self.status = 0 1193 self._stdout.append(popen.stdout.read()) 1194 if popen.stderr: 1195 stderr = popen.stderr.read() 1196 else: 1197 stderr = '' 1198 self._stderr.append(stderr) 1199 1200 def run(self, program = None, 1201 interpreter = None, 1202 arguments = None, 1203 chdir = None, 1204 stdin = None, 1205 universal_newlines = None): 1206 """Runs a test of the program or script for the test 1207 environment. Standard output and error output are saved for 1208 future retrieval via the stdout() and stderr() methods. 1209 1210 The specified program will have the original directory 1211 prepended unless it is enclosed in a [list]. 1212 """ 1213 if chdir: 1214 oldcwd = os.getcwd() 1215 if not os.path.isabs(chdir): 1216 chdir = os.path.join(self.workpath(chdir)) 1217 if self.verbose: 1218 sys.stderr.write("chdir(" + chdir + ")\n") 1219 os.chdir(chdir) 1220 p = self.start(program, 1221 interpreter, 1222 arguments, 1223 universal_newlines, 1224 stdin=stdin) 1225 if stdin: 1226 if is_List(stdin): 1227 for line in stdin: 1228 p.stdin.write(line) 1229 else: 1230 p.stdin.write(stdin) 1231 p.stdin.close() 1232 1233 out = p.stdout.read() 1234 if p.stderr is None: 1235 err = '' 1236 else: 1237 err = p.stderr.read() 1238 try: 1239 close_output = p.close_output 1240 except AttributeError: 1241 p.stdout.close() 1242 if not p.stderr is None: 1243 p.stderr.close() 1244 else: 1245 close_output() 1246 1247 self._stdout.append(out) 1248 self._stderr.append(err) 1249 1250 self.status = p.wait() 1251 if not self.status: 1252 self.status = 0 1253 1254 if chdir: 1255 os.chdir(oldcwd) 1256 if self.verbose >= 2: 1257 write = sys.stdout.write 1258 write('============ STATUS: %d\n' % self.status) 1259 out = self.stdout() 1260 if out or self.verbose >= 3: 1261 write('============ BEGIN STDOUT (len=%d):\n' % len(out)) 1262 write(out) 1263 write('============ END STDOUT\n') 1264 err = self.stderr() 1265 if err or self.verbose >= 3: 1266 write('============ BEGIN STDERR (len=%d)\n' % len(err)) 1267 write(err) 1268 write('============ END STDERR\n') 1269 1270 def sleep(self, seconds = default_sleep_seconds): 1271 """Sleeps at least the specified number of seconds. If no 1272 number is specified, sleeps at least the minimum number of 1273 seconds necessary to advance file time stamps on the current 1274 system. Sleeping more seconds is all right. 1275 """ 1276 time.sleep(seconds) 1277 1278 def stderr(self, run = None): 1279 """Returns the error output from the specified run number. 1280 If there is no specified run number, then returns the error 1281 output of the last run. If the run number is less than zero, 1282 then returns the error output from that many runs back from the 1283 current run. 1284 """ 1285 if not run: 1286 run = len(self._stderr) 1287 elif run < 0: 1288 run = len(self._stderr) + run 1289 run = run - 1 1290 return self._stderr[run] 1291 1292 def stdout(self, run = None): 1293 """Returns the standard output from the specified run number. 1294 If there is no specified run number, then returns the standard 1295 output of the last run. If the run number is less than zero, 1296 then returns the standard output from that many runs back from 1297 the current run. 1298 """ 1299 if not run: 1300 run = len(self._stdout) 1301 elif run < 0: 1302 run = len(self._stdout) + run 1303 run = run - 1 1304 return self._stdout[run] 1305 1306 def subdir(self, *subdirs): 1307 """Create new subdirectories under the temporary working 1308 directory, one for each argument. An argument may be a list, 1309 in which case the list elements are concatenated using the 1310 os.path.join() method. Subdirectories multiple levels deep 1311 must be created using a separate argument for each level: 1312 1313 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) 1314 1315 Returns the number of subdirectories actually created. 1316 """ 1317 count = 0 1318 for sub in subdirs: 1319 if sub is None: 1320 continue 1321 if is_List(sub): 1322 sub = apply(os.path.join, tuple(sub)) 1323 new = os.path.join(self.workdir, sub) 1324 try: 1325 os.mkdir(new) 1326 except OSError: 1327 pass 1328 else: 1329 count = count + 1 1330 return count 1331 1332 def symlink(self, target, link): 1333 """Creates a symlink to the specified target. 1334 The link name may be a list, in which case the elements are 1335 concatenated with the os.path.join() method. The link is 1336 assumed to be under the temporary working directory unless it 1337 is an absolute path name. The target is *not* assumed to be 1338 under the temporary working directory. 1339 """ 1340 link = self.canonicalize(link) 1341 os.symlink(target, link) 1342 1343 def tempdir(self, path=None): 1344 """Creates a temporary directory. 1345 A unique directory name is generated if no path name is specified. 1346 The directory is created, and will be removed when the TestCmd 1347 object is destroyed. 1348 """ 1349 if path is None: 1350 try: 1351 path = tempfile.mktemp(prefix=tempfile.template) 1352 except TypeError: 1353 path = tempfile.mktemp() 1354 os.mkdir(path) 1355 1356 # Symlinks in the path will report things 1357 # differently from os.getcwd(), so chdir there 1358 # and back to fetch the canonical path. 1359 cwd = os.getcwd() 1360 try: 1361 os.chdir(path) 1362 path = os.getcwd() 1363 finally: 1364 os.chdir(cwd) 1365 1366 # Uppercase the drive letter since the case of drive 1367 # letters is pretty much random on win32: 1368 drive,rest = os.path.splitdrive(path) 1369 if drive: 1370 path = string.upper(drive) + rest 1371 1372 # 1373 self._dirlist.append(path) 1374 global _Cleanup 1375 try: 1376 _Cleanup.index(self) 1377 except ValueError: 1378 _Cleanup.append(self) 1379 1380 return path 1381 1382 def touch(self, path, mtime=None): 1383 """Updates the modification time on the specified file or 1384 directory path name. The default is to update to the 1385 current time if no explicit modification time is specified. 1386 """ 1387 path = self.canonicalize(path) 1388 atime = os.path.getatime(path) 1389 if mtime is None: 1390 mtime = time.time() 1391 os.utime(path, (atime, mtime)) 1392 1393 def unlink(self, file): 1394 """Unlinks the specified file name. 1395 The file name may be a list, in which case the elements are 1396 concatenated with the os.path.join() method. The file is 1397 assumed to be under the temporary working directory unless it 1398 is an absolute path name. 1399 """ 1400 file = self.canonicalize(file) 1401 os.unlink(file) 1402 1403 def verbose_set(self, verbose): 1404 """Set the verbose level. 1405 """ 1406 self.verbose = verbose 1407 1408 def where_is(self, file, path=None, pathext=None): 1409 """Find an executable file. 1410 """ 1411 if is_List(file): 1412 file = apply(os.path.join, tuple(file)) 1413 if not os.path.isabs(file): 1414 file = where_is(file, path, pathext) 1415 return file 1416 1417 def workdir_set(self, path): 1418 """Creates a temporary working directory with the specified 1419 path name. If the path is a null string (''), a unique 1420 directory name is created. 1421 """ 1422 if (path != None): 1423 if path == '': 1424 path = None 1425 path = self.tempdir(path) 1426 self.workdir = path 1427 1428 def workpath(self, *args): 1429 """Returns the absolute path name to a subdirectory or file 1430 within the current temporary working directory. Concatenates 1431 the temporary working directory name with the specified 1432 arguments using the os.path.join() method. 1433 """ 1434 return apply(os.path.join, (self.workdir,) + tuple(args)) 1435 1436 def readable(self, top, read=1): 1437 """Make the specified directory tree readable (read == 1) 1438 or not (read == None). 1439 1440 This method has no effect on Windows systems, which use a 1441 completely different mechanism to control file readability. 1442 """ 1443 1444 if sys.platform == 'win32': 1445 return 1446 1447 if read: 1448 def do_chmod(fname): 1449 try: st = os.stat(fname) 1450 except OSError: pass 1451 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD)) 1452 else: 1453 def do_chmod(fname): 1454 try: st = os.stat(fname) 1455 except OSError: pass 1456 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD)) 1457 1458 if os.path.isfile(top): 1459 # If it's a file, that's easy, just chmod it. 1460 do_chmod(top) 1461 elif read: 1462 # It's a directory and we're trying to turn on read 1463 # permission, so it's also pretty easy, just chmod the 1464 # directory and then chmod every entry on our walk down the 1465 # tree. Because os.path.walk() is top-down, we'll enable 1466 # read permission on any directories that have it disabled 1467 # before os.path.walk() tries to list their contents. 1468 do_chmod(top) 1469 1470 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1471 for n in names: 1472 do_chmod(os.path.join(dirname, n)) 1473 1474 os.path.walk(top, chmod_entries, None) 1475 else: 1476 # It's a directory and we're trying to turn off read 1477 # permission, which means we have to chmod the directoreis 1478 # in the tree bottom-up, lest disabling read permission from 1479 # the top down get in the way of being able to get at lower 1480 # parts of the tree. But os.path.walk() visits things top 1481 # down, so we just use an object to collect a list of all 1482 # of the entries in the tree, reverse the list, and then 1483 # chmod the reversed (bottom-up) list. 1484 col = Collector(top) 1485 os.path.walk(top, col, None) 1486 col.entries.reverse() 1487 for d in col.entries: do_chmod(d) 1488 1489 def writable(self, top, write=1): 1490 """Make the specified directory tree writable (write == 1) 1491 or not (write == None). 1492 """ 1493 1494 if sys.platform == 'win32': 1495 1496 if write: 1497 def do_chmod(fname): 1498 try: os.chmod(fname, stat.S_IWRITE) 1499 except OSError: pass 1500 else: 1501 def do_chmod(fname): 1502 try: os.chmod(fname, stat.S_IREAD) 1503 except OSError: pass 1504 1505 else: 1506 1507 if write: 1508 def do_chmod(fname): 1509 try: st = os.stat(fname) 1510 except OSError: pass 1511 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200)) 1512 else: 1513 def do_chmod(fname): 1514 try: st = os.stat(fname) 1515 except OSError: pass 1516 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200)) 1517 1518 if os.path.isfile(top): 1519 do_chmod(top) 1520 else: 1521 col = Collector(top) 1522 os.path.walk(top, col, None) 1523 for d in col.entries: do_chmod(d) 1524 1525 def executable(self, top, execute=1): 1526 """Make the specified directory tree executable (execute == 1) 1527 or not (execute == None). 1528 1529 This method has no effect on Windows systems, which use a 1530 completely different mechanism to control file executability. 1531 """ 1532 1533 if sys.platform == 'win32': 1534 return 1535 1536 if execute: 1537 def do_chmod(fname): 1538 try: st = os.stat(fname) 1539 except OSError: pass 1540 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC)) 1541 else: 1542 def do_chmod(fname): 1543 try: st = os.stat(fname) 1544 except OSError: pass 1545 else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC)) 1546 1547 if os.path.isfile(top): 1548 # If it's a file, that's easy, just chmod it. 1549 do_chmod(top) 1550 elif execute: 1551 # It's a directory and we're trying to turn on execute 1552 # permission, so it's also pretty easy, just chmod the 1553 # directory and then chmod every entry on our walk down the 1554 # tree. Because os.path.walk() is top-down, we'll enable 1555 # execute permission on any directories that have it disabled 1556 # before os.path.walk() tries to list their contents. 1557 do_chmod(top) 1558 1559 def chmod_entries(arg, dirname, names, do_chmod=do_chmod): 1560 for n in names: 1561 do_chmod(os.path.join(dirname, n)) 1562 1563 os.path.walk(top, chmod_entries, None) 1564 else: 1565 # It's a directory and we're trying to turn off execute 1566 # permission, which means we have to chmod the directories 1567 # in the tree bottom-up, lest disabling execute permission from 1568 # the top down get in the way of being able to get at lower 1569 # parts of the tree. But os.path.walk() visits things top 1570 # down, so we just use an object to collect a list of all 1571 # of the entries in the tree, reverse the list, and then 1572 # chmod the reversed (bottom-up) list. 1573 col = Collector(top) 1574 os.path.walk(top, col, None) 1575 col.entries.reverse() 1576 for d in col.entries: do_chmod(d) 1577 1578 def write(self, file, content, mode = 'wb'): 1579 """Writes the specified content text (second argument) to the 1580 specified file name (first argument). The file name may be 1581 a list, in which case the elements are concatenated with the 1582 os.path.join() method. The file is created under the temporary 1583 working directory. Any subdirectories in the path must already 1584 exist. The I/O mode for the file may be specified; it must 1585 begin with a 'w'. The default is 'wb' (binary write). 1586 """ 1587 file = self.canonicalize(file) 1588 if mode[0] != 'w': 1589 raise ValueError, "mode must begin with 'w'" 1590 with open(file, mode) as f: 1591 f.write(content) 1592 1593# Local Variables: 1594# tab-width:4 1595# indent-tabs-mode:nil 1596# End: 1597# vim: set expandtab tabstop=4 shiftwidth=4: 1598