1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# pylint: disable-msg=W0612,W0613,C6409 16 17"""A fake filesystem implementation for unit testing. 18 19Includes: 20 FakeFile: Provides the appearance of a real file. 21 FakeDirectory: Provides the appearance of a real dir. 22 FakeFilesystem: Provides the appearance of a real directory hierarchy. 23 FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement. 24 FakePathModule: Faked os.path module replacement. 25 FakeFileOpen: Faked file() and open() function replacements. 26 27Usage: 28>>> import fake_filesystem 29>>> filesystem = fake_filesystem.FakeFilesystem() 30>>> os_module = fake_filesystem.FakeOsModule(filesystem) 31>>> pathname = '/a/new/dir/new-file' 32 33Create a new file object, creating parent directory objects as needed: 34>>> os_module.path.exists(pathname) 35False 36>>> new_file = filesystem.CreateFile(pathname) 37 38File objects can't be overwritten: 39>>> os_module.path.exists(pathname) 40True 41>>> try: 42... filesystem.CreateFile(pathname) 43... except IOError as e: 44... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno 45... assert e.strerror == 'File already exists in fake filesystem' 46 47Remove a file object: 48>>> filesystem.RemoveObject(pathname) 49>>> os_module.path.exists(pathname) 50False 51 52Create a new file object at the previous path: 53>>> beatles_file = filesystem.CreateFile(pathname, 54... contents='Dear Prudence\\nWon\\'t you come out to play?\\n') 55>>> os_module.path.exists(pathname) 56True 57 58Use the FakeFileOpen class to read fake file objects: 59>>> file_module = fake_filesystem.FakeFileOpen(filesystem) 60>>> for line in file_module(pathname): 61... print line.rstrip() 62... 63Dear Prudence 64Won't you come out to play? 65 66File objects cannot be treated like directory objects: 67>>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE 68Traceback (most recent call last): 69 File "fake_filesystem.py", line 291, in listdir 70 raise OSError(errno.ENOTDIR, 71OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file' 72 73The FakeOsModule can list fake directory objects: 74>>> os_module.listdir(os_module.path.dirname(pathname)) 75['new-file'] 76 77The FakeOsModule also supports stat operations: 78>>> import stat 79>>> stat.S_ISREG(os_module.stat(pathname).st_mode) 80True 81>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode) 82True 83""" 84 85import errno 86import heapq 87import os 88import stat 89import sys 90import time 91import warnings 92try: 93 import cStringIO as io # pylint: disable-msg=C6204 94except ImportError: 95 import io # pylint: disable-msg=C6204 96 97__pychecker__ = 'no-reimportself' 98 99__version__ = '2.5' 100 101PERM_READ = 0o400 # Read permission bit. 102PERM_WRITE = 0o200 # Write permission bit. 103PERM_EXE = 0o100 # Write permission bit. 104PERM_DEF = 0o777 # Default permission bits. 105PERM_DEF_FILE = 0o666 # Default permission bits (regular file) 106PERM_ALL = 0o7777 # All permission bits. 107 108_OPEN_MODE_MAP = { 109 # mode name:(file must exist, need read, need write, 110 # truncate [implies need write], append) 111 'r': (True, True, False, False, False), 112 'w': (False, False, True, True, False), 113 'a': (False, False, True, False, True), 114 'r+': (True, True, True, False, False), 115 'w+': (False, True, True, True, False), 116 'a+': (False, True, True, False, True), 117 } 118 119_MAX_LINK_DEPTH = 20 120 121FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; ' 122 'let FakeOsModule instantiate it. See the ' 123 'FakeOsModule docstring for details.') 124 125 126class Error(Exception): 127 pass 128 129_is_windows = sys.platform.startswith('win') 130_is_cygwin = sys.platform == 'cygwin' 131 132if _is_windows: 133 # On Windows, raise WindowsError instead of OSError if available 134 OSError = WindowsError # pylint: disable-msg=E0602,W0622 135 136 137class FakeLargeFileIoException(Error): 138 def __init__(self, file_path): 139 Error.__init__(self, 140 'Read and write operations not supported for ' 141 'fake large file: %s' % file_path) 142 143 144def CopyModule(old): 145 """Recompiles and creates new module object.""" 146 saved = sys.modules.pop(old.__name__, None) 147 new = __import__(old.__name__) 148 sys.modules[old.__name__] = saved 149 return new 150 151 152class FakeFile(object): 153 """Provides the appearance of a real file. 154 155 Attributes currently faked out: 156 st_mode: user-specified, otherwise S_IFREG 157 st_ctime: the time.time() timestamp when the file is created. 158 st_size: the size of the file 159 160 Other attributes needed by os.stat are assigned default value of None 161 these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime, 162 st_mtime 163 """ 164 165 def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE, 166 contents=None): 167 """init. 168 169 Args: 170 name: name of the file/directory, without parent path information 171 st_mode: the stat.S_IF* constant representing the file type (i.e. 172 stat.S_IFREG, stat.SIFDIR) 173 contents: the contents of the filesystem object; should be a string for 174 regular files, and a list of other FakeFile or FakeDirectory objects 175 for FakeDirectory objects 176 """ 177 self.name = name 178 self.st_mode = st_mode 179 self.contents = contents 180 self.epoch = 0 181 self.st_ctime = int(time.time()) 182 self.st_atime = self.st_ctime 183 self.st_mtime = self.st_ctime 184 if contents: 185 self.st_size = len(contents) 186 else: 187 self.st_size = 0 188 # Non faked features, write setter methods for fakeing them 189 self.st_ino = None 190 self.st_dev = None 191 self.st_nlink = None 192 self.st_uid = None 193 self.st_gid = None 194 195 def SetLargeFileSize(self, st_size): 196 """Sets the self.st_size attribute and replaces self.content with None. 197 198 Provided specifically to simulate very large files without regards 199 to their content (which wouldn't fit in memory) 200 201 Args: 202 st_size: The desired file size 203 204 Raises: 205 IOError: if the st_size is not a non-negative integer 206 """ 207 # the st_size should be an positive integer value 208 if not isinstance(st_size, int) or st_size < 0: 209 raise IOError(errno.ENOSPC, 210 'Fake file object: can not create non negative integer ' 211 'size=%r fake file' % st_size, 212 self.name) 213 214 self.st_size = st_size 215 self.contents = None 216 217 def IsLargeFile(self): 218 """Return True if this file was initialized with size but no contents.""" 219 return self.contents is None 220 221 def SetContents(self, contents): 222 """Sets the file contents and size. 223 224 Args: 225 contents: string, new content of file. 226 """ 227 # convert a byte array to a string 228 if sys.version_info >= (3, 0) and isinstance(contents, bytes): 229 contents = ''.join(chr(i) for i in contents) 230 self.contents = contents 231 self.st_size = len(contents) 232 self.epoch += 1 233 234 def SetSize(self, st_size): 235 """Resizes file content, padding with nulls if new size exceeds the old. 236 237 Args: 238 st_size: The desired size for the file. 239 240 Raises: 241 IOError: if the st_size arg is not a non-negative integer 242 """ 243 244 if not isinstance(st_size, int) or st_size < 0: 245 raise IOError(errno.ENOSPC, 246 'Fake file object: can not create non negative integer ' 247 'size=%r fake file' % st_size, 248 self.name) 249 250 current_size = len(self.contents) 251 if st_size < current_size: 252 self.contents = self.contents[:st_size] 253 else: 254 self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size)) 255 self.st_size = len(self.contents) 256 self.epoch += 1 257 258 def SetATime(self, st_atime): 259 """Set the self.st_atime attribute. 260 261 Args: 262 st_atime: The desired atime. 263 """ 264 self.st_atime = st_atime 265 266 def SetMTime(self, st_mtime): 267 """Set the self.st_mtime attribute. 268 269 Args: 270 st_mtime: The desired mtime. 271 """ 272 self.st_mtime = st_mtime 273 274 def __str__(self): 275 return '%s(%o)' % (self.name, self.st_mode) 276 277 def SetIno(self, st_ino): 278 """Set the self.st_ino attribute. 279 280 Args: 281 st_ino: The desired inode. 282 """ 283 self.st_ino = st_ino 284 285 286class FakeDirectory(FakeFile): 287 """Provides the appearance of a real dir.""" 288 289 def __init__(self, name, perm_bits=PERM_DEF): 290 """init. 291 292 Args: 293 name: name of the file/directory, without parent path information 294 perm_bits: permission bits. defaults to 0o777. 295 """ 296 FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {}) 297 298 def AddEntry(self, pathname): 299 """Adds a child FakeFile to this directory. 300 301 Args: 302 pathname: FakeFile instance to add as a child of this directory 303 """ 304 self.contents[pathname.name] = pathname 305 306 def GetEntry(self, pathname_name): 307 """Retrieves the specified child file or directory. 308 309 Args: 310 pathname_name: basename of the child object to retrieve 311 Returns: 312 string, file contents 313 Raises: 314 KeyError: if no child exists by the specified name 315 """ 316 return self.contents[pathname_name] 317 318 def RemoveEntry(self, pathname_name): 319 """Removes the specified child file or directory. 320 321 Args: 322 pathname_name: basename of the child object to remove 323 324 Raises: 325 KeyError: if no child exists by the specified name 326 """ 327 del self.contents[pathname_name] 328 329 def __str__(self): 330 rc = super(FakeDirectory, self).__str__() + ':\n' 331 for item in self.contents: 332 item_desc = self.contents[item].__str__() 333 for line in item_desc.split('\n'): 334 if line: 335 rc = rc + ' ' + line + '\n' 336 return rc 337 338 339class FakeFilesystem(object): 340 """Provides the appearance of a real directory tree for unit testing.""" 341 342 def __init__(self, path_separator=os.path.sep): 343 """init. 344 345 Args: 346 path_separator: optional substitute for os.path.sep 347 """ 348 self.path_separator = path_separator 349 self.root = FakeDirectory(self.path_separator) 350 self.cwd = self.root.name 351 # We can't query the current value without changing it: 352 self.umask = os.umask(0o22) 353 os.umask(self.umask) 354 # A list of open file objects. Their position in the list is their 355 # file descriptor number 356 self.open_files = [] 357 # A heap containing all free positions in self.open_files list 358 self.free_fd_heap = [] 359 360 def SetIno(self, path, st_ino): 361 """Set the self.st_ino attribute of file at 'path'. 362 363 Args: 364 path: Path to file. 365 st_ino: The desired inode. 366 """ 367 self.GetObject(path).SetIno(st_ino) 368 369 def AddOpenFile(self, file_obj): 370 """Adds file_obj to the list of open files on the filesystem. 371 372 The position in the self.open_files array is the file descriptor number 373 374 Args: 375 file_obj: file object to be added to open files list. 376 377 Returns: 378 File descriptor number for the file object. 379 """ 380 if self.free_fd_heap: 381 open_fd = heapq.heappop(self.free_fd_heap) 382 self.open_files[open_fd] = file_obj 383 return open_fd 384 385 self.open_files.append(file_obj) 386 return len(self.open_files) - 1 387 388 def CloseOpenFile(self, file_obj): 389 """Removes file_obj from the list of open files on the filesystem. 390 391 Sets the entry in open_files to None. 392 393 Args: 394 file_obj: file object to be removed to open files list. 395 """ 396 self.open_files[file_obj.filedes] = None 397 heapq.heappush(self.free_fd_heap, file_obj.filedes) 398 399 def GetOpenFile(self, file_des): 400 """Returns an open file. 401 402 Args: 403 file_des: file descriptor of the open file. 404 405 Raises: 406 OSError: an invalid file descriptor. 407 TypeError: filedes is not an integer. 408 409 Returns: 410 Open file object. 411 """ 412 if not isinstance(file_des, int): 413 raise TypeError('an integer is required') 414 if (file_des >= len(self.open_files) or 415 self.open_files[file_des] is None): 416 raise OSError(errno.EBADF, 'Bad file descriptor', file_des) 417 return self.open_files[file_des] 418 419 def CollapsePath(self, path): 420 """Mimics os.path.normpath using the specified path_separator. 421 422 Mimics os.path.normpath using the path_separator that was specified 423 for this FakeFilesystem. Normalizes the path, but unlike the method 424 NormalizePath, does not make it absolute. Eliminates dot components 425 (. and ..) and combines repeated path separators (//). Initial .. 426 components are left in place for relative paths. If the result is an empty 427 path, '.' is returned instead. Unlike the real os.path.normpath, this does 428 not replace '/' with '\\' on Windows. 429 430 Args: 431 path: (str) The path to normalize. 432 433 Returns: 434 (str) A copy of path with empty components and dot components removed. 435 """ 436 is_absolute_path = path.startswith(self.path_separator) 437 path_components = path.split(self.path_separator) 438 collapsed_path_components = [] 439 for component in path_components: 440 if (not component) or (component == '.'): 441 continue 442 if component == '..': 443 if collapsed_path_components and ( 444 collapsed_path_components[-1] != '..'): 445 # Remove an up-reference: directory/.. 446 collapsed_path_components.pop() 447 continue 448 elif is_absolute_path: 449 # Ignore leading .. components if starting from the root directory. 450 continue 451 collapsed_path_components.append(component) 452 collapsed_path = self.path_separator.join(collapsed_path_components) 453 if is_absolute_path: 454 collapsed_path = self.path_separator + collapsed_path 455 return collapsed_path or '.' 456 457 def NormalizePath(self, path): 458 """Absolutize and minimalize the given path. 459 460 Forces all relative paths to be absolute, and normalizes the path to 461 eliminate dot and empty components. 462 463 Args: 464 path: path to normalize 465 466 Returns: 467 The normalized path relative to the current working directory, or the root 468 directory if path is empty. 469 """ 470 if not path: 471 path = self.path_separator 472 elif not path.startswith(self.path_separator): 473 # Prefix relative paths with cwd, if cwd is not root. 474 path = self.path_separator.join( 475 (self.cwd != self.root.name and self.cwd or '', 476 path)) 477 if path == '.': 478 path = self.cwd 479 return self.CollapsePath(path) 480 481 def SplitPath(self, path): 482 """Mimics os.path.split using the specified path_separator. 483 484 Mimics os.path.split using the path_separator that was specified 485 for this FakeFilesystem. 486 487 Args: 488 path: (str) The path to split. 489 490 Returns: 491 (str) A duple (pathname, basename) for which pathname does not 492 end with a slash, and basename does not contain a slash. 493 """ 494 path_components = path.split(self.path_separator) 495 if not path_components: 496 return ('', '') 497 basename = path_components.pop() 498 if not path_components: 499 return ('', basename) 500 for component in path_components: 501 if component: 502 # The path is not the root; it contains a non-separator component. 503 # Strip all trailing separators. 504 while not path_components[-1]: 505 path_components.pop() 506 return (self.path_separator.join(path_components), basename) 507 # Root path. Collapse all leading separators. 508 return (self.path_separator, basename) 509 510 def JoinPaths(self, *paths): 511 """Mimics os.path.join using the specified path_separator. 512 513 Mimics os.path.join using the path_separator that was specified 514 for this FakeFilesystem. 515 516 Args: 517 *paths: (str) Zero or more paths to join. 518 519 Returns: 520 (str) The paths joined by the path separator, starting with the last 521 absolute path in paths. 522 """ 523 if len(paths) == 1: 524 return paths[0] 525 joined_path_segments = [] 526 for path_segment in paths: 527 if path_segment.startswith(self.path_separator): 528 # An absolute path 529 joined_path_segments = [path_segment] 530 else: 531 if (joined_path_segments and 532 not joined_path_segments[-1].endswith(self.path_separator)): 533 joined_path_segments.append(self.path_separator) 534 if path_segment: 535 joined_path_segments.append(path_segment) 536 return ''.join(joined_path_segments) 537 538 def GetPathComponents(self, path): 539 """Breaks the path into a list of component names. 540 541 Does not include the root directory as a component, as all paths 542 are considered relative to the root directory for the FakeFilesystem. 543 Callers should basically follow this pattern: 544 545 file_path = self.NormalizePath(file_path) 546 path_components = self.GetPathComponents(file_path) 547 current_dir = self.root 548 for component in path_components: 549 if component not in current_dir.contents: 550 raise IOError 551 DoStuffWithComponent(curent_dir, component) 552 current_dir = current_dir.GetEntry(component) 553 554 Args: 555 path: path to tokenize 556 557 Returns: 558 The list of names split from path 559 """ 560 if not path or path == self.root.name: 561 return [] 562 path_components = path.split(self.path_separator) 563 assert path_components 564 if not path_components[0]: 565 # This is an absolute path. 566 path_components = path_components[1:] 567 return path_components 568 569 def Exists(self, file_path): 570 """True if a path points to an existing file system object. 571 572 Args: 573 file_path: path to examine 574 575 Returns: 576 bool(if object exists) 577 578 Raises: 579 TypeError: if file_path is None 580 """ 581 if file_path is None: 582 raise TypeError 583 if not file_path: 584 return False 585 try: 586 file_path = self.ResolvePath(file_path) 587 except IOError: 588 return False 589 if file_path == self.root.name: 590 return True 591 path_components = self.GetPathComponents(file_path) 592 current_dir = self.root 593 for component in path_components: 594 if component not in current_dir.contents: 595 return False 596 current_dir = current_dir.contents[component] 597 return True 598 599 def ResolvePath(self, file_path): 600 """Follow a path, resolving symlinks. 601 602 ResolvePath traverses the filesystem along the specified file path, 603 resolving file names and symbolic links until all elements of the path are 604 exhausted, or we reach a file which does not exist. If all the elements 605 are not consumed, they just get appended to the path resolved so far. 606 This gives us the path which is as resolved as it can be, even if the file 607 does not exist. 608 609 This behavior mimics Unix semantics, and is best shown by example. Given a 610 file system that looks like this: 611 612 /a/b/ 613 /a/b/c -> /a/b2 c is a symlink to /a/b2 614 /a/b2/x 615 /a/c -> ../d 616 /a/x -> y 617 Then: 618 /a/b/x => /a/b/x 619 /a/c => /a/d 620 /a/x => /a/y 621 /a/b/c/d/e => /a/b2/d/e 622 623 Args: 624 file_path: path to examine 625 626 Returns: 627 resolved_path (string) or None 628 629 Raises: 630 TypeError: if file_path is None 631 IOError: if file_path is '' or a part of the path doesn't exist 632 """ 633 634 def _ComponentsToPath(component_folders): 635 return '%s%s' % (self.path_separator, 636 self.path_separator.join(component_folders)) 637 638 def _ValidRelativePath(file_path): 639 while file_path and '/..' in file_path: 640 file_path = file_path[:file_path.rfind('/..')] 641 if not self.Exists(self.NormalizePath(file_path)): 642 return False 643 return True 644 645 def _FollowLink(link_path_components, link): 646 """Follow a link w.r.t. a path resolved so far. 647 648 The component is either a real file, which is a no-op, or a symlink. 649 In the case of a symlink, we have to modify the path as built up so far 650 /a/b => ../c should yield /a/../c (which will normalize to /a/c) 651 /a/b => x should yield /a/x 652 /a/b => /x/y/z should yield /x/y/z 653 The modified path may land us in a new spot which is itself a 654 link, so we may repeat the process. 655 656 Args: 657 link_path_components: The resolved path built up to the link so far. 658 link: The link object itself. 659 660 Returns: 661 (string) the updated path resolved after following the link. 662 663 Raises: 664 IOError: if there are too many levels of symbolic link 665 """ 666 link_path = link.contents 667 # For links to absolute paths, we want to throw out everything in the 668 # path built so far and replace with the link. For relative links, we 669 # have to append the link to what we have so far, 670 if not link_path.startswith(self.path_separator): 671 # Relative path. Append remainder of path to what we have processed 672 # so far, excluding the name of the link itself. 673 # /a/b => ../c should yield /a/../c (which will normalize to /c) 674 # /a/b => d should yield a/d 675 components = link_path_components[:-1] 676 components.append(link_path) 677 link_path = self.path_separator.join(components) 678 # Don't call self.NormalizePath(), as we don't want to prepend self.cwd. 679 return self.CollapsePath(link_path) 680 681 if file_path is None: 682 # file.open(None) raises TypeError, so mimic that. 683 raise TypeError('Expected file system path string, received None') 684 if not file_path or not _ValidRelativePath(file_path): 685 # file.open('') raises IOError, so mimic that, and validate that all 686 # parts of a relative path exist. 687 raise IOError(errno.ENOENT, 688 'No such file or directory: \'%s\'' % file_path) 689 file_path = self.NormalizePath(file_path) 690 if file_path == self.root.name: 691 return file_path 692 693 current_dir = self.root 694 path_components = self.GetPathComponents(file_path) 695 696 resolved_components = [] 697 link_depth = 0 698 while path_components: 699 component = path_components.pop(0) 700 resolved_components.append(component) 701 if component not in current_dir.contents: 702 # The component of the path at this point does not actually exist in 703 # the folder. We can't resolve the path any more. It is legal to link 704 # to a file that does not yet exist, so rather than raise an error, we 705 # just append the remaining components to what return path we have built 706 # so far and return that. 707 resolved_components.extend(path_components) 708 break 709 current_dir = current_dir.contents[component] 710 711 # Resolve any possible symlinks in the current path component. 712 if stat.S_ISLNK(current_dir.st_mode): 713 # This link_depth check is not really meant to be an accurate check. 714 # It is just a quick hack to prevent us from looping forever on 715 # cycles. 716 link_depth += 1 717 if link_depth > _MAX_LINK_DEPTH: 718 raise IOError(errno.EMLINK, 719 'Too many levels of symbolic links: \'%s\'' % 720 _ComponentsToPath(resolved_components)) 721 link_path = _FollowLink(resolved_components, current_dir) 722 723 # Following the link might result in the complete replacement of the 724 # current_dir, so we evaluate the entire resulting path. 725 target_components = self.GetPathComponents(link_path) 726 path_components = target_components + path_components 727 resolved_components = [] 728 current_dir = self.root 729 return _ComponentsToPath(resolved_components) 730 731 def GetObjectFromNormalizedPath(self, file_path): 732 """Searches for the specified filesystem object within the fake filesystem. 733 734 Args: 735 file_path: specifies target FakeFile object to retrieve, with a 736 path that has already been normalized/resolved 737 738 Returns: 739 the FakeFile object corresponding to file_path 740 741 Raises: 742 IOError: if the object is not found 743 """ 744 if file_path == self.root.name: 745 return self.root 746 path_components = self.GetPathComponents(file_path) 747 target_object = self.root 748 try: 749 for component in path_components: 750 if not isinstance(target_object, FakeDirectory): 751 raise IOError(errno.ENOENT, 752 'No such file or directory in fake filesystem', 753 file_path) 754 target_object = target_object.GetEntry(component) 755 except KeyError: 756 raise IOError(errno.ENOENT, 757 'No such file or directory in fake filesystem', 758 file_path) 759 return target_object 760 761 def GetObject(self, file_path): 762 """Searches for the specified filesystem object within the fake filesystem. 763 764 Args: 765 file_path: specifies target FakeFile object to retrieve 766 767 Returns: 768 the FakeFile object corresponding to file_path 769 770 Raises: 771 IOError: if the object is not found 772 """ 773 file_path = self.NormalizePath(file_path) 774 return self.GetObjectFromNormalizedPath(file_path) 775 776 def ResolveObject(self, file_path): 777 """Searches for the specified filesystem object, resolving all links. 778 779 Args: 780 file_path: specifies target FakeFile object to retrieve 781 782 Returns: 783 the FakeFile object corresponding to file_path 784 785 Raises: 786 IOError: if the object is not found 787 """ 788 return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path)) 789 790 def LResolveObject(self, path): 791 """Searches for the specified object, resolving only parent links. 792 793 This is analogous to the stat/lstat difference. This resolves links *to* 794 the object but not of the final object itself. 795 796 Args: 797 path: specifies target FakeFile object to retrieve 798 799 Returns: 800 the FakeFile object corresponding to path 801 802 Raises: 803 IOError: if the object is not found 804 """ 805 if path == self.root.name: 806 # The root directory will never be a link 807 return self.root 808 parent_directory, child_name = self.SplitPath(path) 809 if not parent_directory: 810 parent_directory = self.cwd 811 try: 812 parent_obj = self.ResolveObject(parent_directory) 813 assert parent_obj 814 if not isinstance(parent_obj, FakeDirectory): 815 raise IOError(errno.ENOENT, 816 'No such file or directory in fake filesystem', 817 path) 818 return parent_obj.GetEntry(child_name) 819 except KeyError: 820 raise IOError(errno.ENOENT, 821 'No such file or directory in the fake filesystem', 822 path) 823 824 def AddObject(self, file_path, file_object): 825 """Add a fake file or directory into the filesystem at file_path. 826 827 Args: 828 file_path: the path to the file to be added relative to self 829 file_object: file or directory to add 830 831 Raises: 832 IOError: if file_path does not correspond to a directory 833 """ 834 try: 835 target_directory = self.GetObject(file_path) 836 target_directory.AddEntry(file_object) 837 except AttributeError: 838 raise IOError(errno.ENOTDIR, 839 'Not a directory in the fake filesystem', 840 file_path) 841 842 def RemoveObject(self, file_path): 843 """Remove an existing file or directory. 844 845 Args: 846 file_path: the path to the file relative to self 847 848 Raises: 849 IOError: if file_path does not correspond to an existing file, or if part 850 of the path refers to something other than a directory 851 OSError: if the directory is in use (eg, if it is '/') 852 """ 853 if file_path == self.root.name: 854 raise OSError(errno.EBUSY, 'Fake device or resource busy', 855 file_path) 856 try: 857 dirname, basename = self.SplitPath(file_path) 858 target_directory = self.GetObject(dirname) 859 target_directory.RemoveEntry(basename) 860 except KeyError: 861 raise IOError(errno.ENOENT, 862 'No such file or directory in the fake filesystem', 863 file_path) 864 except AttributeError: 865 raise IOError(errno.ENOTDIR, 866 'Not a directory in the fake filesystem', 867 file_path) 868 869 def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None): 870 """Creates directory_path, and all the parent directories. 871 872 Helper method to set up your test faster 873 874 Args: 875 directory_path: directory to create 876 perm_bits: permission bits 877 inode: inode of directory 878 879 Returns: 880 the newly created FakeDirectory object 881 882 Raises: 883 OSError: if the directory already exists 884 """ 885 directory_path = self.NormalizePath(directory_path) 886 if self.Exists(directory_path): 887 raise OSError(errno.EEXIST, 888 'Directory exists in fake filesystem', 889 directory_path) 890 path_components = self.GetPathComponents(directory_path) 891 current_dir = self.root 892 893 for component in path_components: 894 if component not in current_dir.contents: 895 new_dir = FakeDirectory(component, perm_bits) 896 current_dir.AddEntry(new_dir) 897 current_dir = new_dir 898 else: 899 current_dir = current_dir.contents[component] 900 901 current_dir.SetIno(inode) 902 return current_dir 903 904 def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, 905 contents='', st_size=None, create_missing_dirs=True, 906 apply_umask=False, inode=None): 907 """Creates file_path, including all the parent directories along the way. 908 909 Helper method to set up your test faster. 910 911 Args: 912 file_path: path to the file to create 913 st_mode: the stat.S_IF constant representing the file type 914 contents: the contents of the file 915 st_size: file size; only valid if contents=None 916 create_missing_dirs: if True, auto create missing directories 917 apply_umask: whether or not the current umask must be applied on st_mode 918 inode: inode of the file 919 920 Returns: 921 the newly created FakeFile object 922 923 Raises: 924 IOError: if the file already exists 925 IOError: if the containing directory is required and missing 926 """ 927 file_path = self.NormalizePath(file_path) 928 if self.Exists(file_path): 929 raise IOError(errno.EEXIST, 930 'File already exists in fake filesystem', 931 file_path) 932 parent_directory, new_file = self.SplitPath(file_path) 933 if not parent_directory: 934 parent_directory = self.cwd 935 if not self.Exists(parent_directory): 936 if not create_missing_dirs: 937 raise IOError(errno.ENOENT, 'No such fake directory', parent_directory) 938 self.CreateDirectory(parent_directory) 939 if apply_umask: 940 st_mode &= ~self.umask 941 file_object = FakeFile(new_file, st_mode, contents) 942 file_object.SetIno(inode) 943 self.AddObject(parent_directory, file_object) 944 945 # set the size if st_size is given 946 if not contents and st_size is not None: 947 try: 948 file_object.SetLargeFileSize(st_size) 949 except IOError: 950 self.RemoveObject(file_path) 951 raise 952 953 return file_object 954 955 def CreateLink(self, file_path, link_target): 956 """Creates the specified symlink, pointed at the specified link target. 957 958 Args: 959 file_path: path to the symlink to create 960 link_target: the target of the symlink 961 962 Returns: 963 the newly created FakeFile object 964 965 Raises: 966 IOError: if the file already exists 967 """ 968 resolved_file_path = self.ResolvePath(file_path) 969 return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF, 970 contents=link_target) 971 972 def __str__(self): 973 return str(self.root) 974 975 976class FakePathModule(object): 977 """Faked os.path module replacement. 978 979 FakePathModule should *only* be instantiated by FakeOsModule. See the 980 FakeOsModule docstring for details. 981 """ 982 _OS_PATH_COPY = CopyModule(os.path) 983 984 def __init__(self, filesystem, os_module=None): 985 """Init. 986 987 Args: 988 filesystem: FakeFilesystem used to provide file system information 989 os_module: (deprecated) FakeOsModule to assign to self.os 990 """ 991 self.filesystem = filesystem 992 self._os_path = self._OS_PATH_COPY 993 if os_module is None: 994 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, 995 stacklevel=2) 996 self._os_path.os = self.os = os_module 997 self.sep = self.filesystem.path_separator 998 999 def exists(self, path): 1000 """Determines whether the file object exists within the fake filesystem. 1001 1002 Args: 1003 path: path to the file object 1004 1005 Returns: 1006 bool (if file exists) 1007 """ 1008 return self.filesystem.Exists(path) 1009 1010 def lexists(self, path): 1011 """Test whether a path exists. Returns True for broken symbolic links. 1012 1013 Args: 1014 path: path to the symlnk object 1015 1016 Returns: 1017 bool (if file exists) 1018 """ 1019 return self.exists(path) or self.islink(path) 1020 1021 def getsize(self, path): 1022 """Return the file object size in bytes. 1023 1024 Args: 1025 path: path to the file object 1026 1027 Returns: 1028 file size in bytes 1029 """ 1030 file_obj = self.filesystem.GetObject(path) 1031 return file_obj.st_size 1032 1033 def _istype(self, path, st_flag): 1034 """Helper function to implement isdir(), islink(), etc. 1035 1036 See the stat(2) man page for valid stat.S_I* flag values 1037 1038 Args: 1039 path: path to file to stat and test 1040 st_flag: the stat.S_I* flag checked for the file's st_mode 1041 1042 Returns: 1043 boolean (the st_flag is set in path's st_mode) 1044 1045 Raises: 1046 TypeError: if path is None 1047 """ 1048 if path is None: 1049 raise TypeError 1050 try: 1051 obj = self.filesystem.ResolveObject(path) 1052 if obj: 1053 return stat.S_IFMT(obj.st_mode) == st_flag 1054 except IOError: 1055 return False 1056 return False 1057 1058 def isabs(self, path): 1059 if self.filesystem.path_separator == os.path.sep: 1060 # Pass through to os.path.isabs, which on Windows has special 1061 # handling for a leading drive letter. 1062 return self._os_path.isabs(path) 1063 else: 1064 return path.startswith(self.filesystem.path_separator) 1065 1066 def isdir(self, path): 1067 """Determines if path identifies a directory.""" 1068 return self._istype(path, stat.S_IFDIR) 1069 1070 def isfile(self, path): 1071 """Determines if path identifies a regular file.""" 1072 return self._istype(path, stat.S_IFREG) 1073 1074 def islink(self, path): 1075 """Determines if path identifies a symbolic link. 1076 1077 Args: 1078 path: path to filesystem object. 1079 1080 Returns: 1081 boolean (the st_flag is set in path's st_mode) 1082 1083 Raises: 1084 TypeError: if path is None 1085 """ 1086 if path is None: 1087 raise TypeError 1088 try: 1089 link_obj = self.filesystem.LResolveObject(path) 1090 return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK 1091 except IOError: 1092 return False 1093 except KeyError: 1094 return False 1095 return False 1096 1097 def getmtime(self, path): 1098 """Returns the mtime of the file.""" 1099 try: 1100 file_obj = self.filesystem.GetObject(path) 1101 except IOError as e: 1102 raise OSError(errno.ENOENT, str(e)) 1103 return file_obj.st_mtime 1104 1105 def abspath(self, path): 1106 """Return the absolute version of a path.""" 1107 if not self.isabs(path): 1108 if sys.version_info < (3, 0) and isinstance(path, unicode): 1109 cwd = self.os.getcwdu() 1110 else: 1111 cwd = self.os.getcwd() 1112 path = self.join(cwd, path) 1113 return self.normpath(path) 1114 1115 def join(self, *p): 1116 """Returns the completed path with a separator of the parts.""" 1117 return self.filesystem.JoinPaths(*p) 1118 1119 def normpath(self, path): 1120 """Normalize path, eliminating double slashes, etc.""" 1121 return self.filesystem.CollapsePath(path) 1122 1123 if _is_windows: 1124 1125 def relpath(self, path, start=None): 1126 """ntpath.relpath() needs the cwd passed in the start argument.""" 1127 if start is None: 1128 start = self.filesystem.cwd 1129 path = self._os_path.relpath(path, start) 1130 return path.replace(self._os_path.sep, self.filesystem.path_separator) 1131 1132 realpath = abspath 1133 1134 def __getattr__(self, name): 1135 """Forwards any non-faked calls to os.path.""" 1136 return self._os_path.__dict__[name] 1137 1138 1139class FakeOsModule(object): 1140 """Uses FakeFilesystem to provide a fake os module replacement. 1141 1142 Do not create os.path separately from os, as there is a necessary circular 1143 dependency between os and os.path to replicate the behavior of the standard 1144 Python modules. What you want to do is to just let FakeOsModule take care of 1145 os.path setup itself. 1146 1147 # You always want to do this. 1148 filesystem = fake_filesystem.FakeFilesystem() 1149 my_os_module = fake_filesystem.FakeOsModule(filesystem) 1150 """ 1151 1152 def __init__(self, filesystem, os_path_module=None): 1153 """Also exposes self.path (to fake os.path). 1154 1155 Args: 1156 filesystem: FakeFilesystem used to provide file system information 1157 os_path_module: (deprecated) optional FakePathModule instance 1158 """ 1159 self.filesystem = filesystem 1160 self.sep = filesystem.path_separator 1161 self._os_module = os 1162 if os_path_module is None: 1163 self.path = FakePathModule(self.filesystem, self) 1164 else: 1165 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, 1166 stacklevel=2) 1167 self.path = os_path_module 1168 if sys.version_info < (3, 0): 1169 self.fdopen = self._fdopen_ver2 1170 else: 1171 self.fdopen = self._fdopen 1172 1173 def _fdopen(self, *args, **kwargs): 1174 """Redirector to open() builtin function. 1175 1176 Args: 1177 *args: pass through args 1178 **kwargs: pass through kwargs 1179 1180 Returns: 1181 File object corresponding to file_des. 1182 1183 Raises: 1184 TypeError: if file descriptor is not an integer. 1185 """ 1186 if not isinstance(args[0], int): 1187 raise TypeError('an integer is required') 1188 return FakeFileOpen(self.filesystem)(*args, **kwargs) 1189 1190 def _fdopen_ver2(self, file_des, mode='r', bufsize=None): 1191 """Returns an open file object connected to the file descriptor file_des. 1192 1193 Args: 1194 file_des: An integer file descriptor for the file object requested. 1195 mode: additional file flags. Currently checks to see if the mode matches 1196 the mode of the requested file object. 1197 bufsize: ignored. (Used for signature compliance with __builtin__.fdopen) 1198 1199 Returns: 1200 File object corresponding to file_des. 1201 1202 Raises: 1203 OSError: if bad file descriptor or incompatible mode is given. 1204 TypeError: if file descriptor is not an integer. 1205 """ 1206 if not isinstance(file_des, int): 1207 raise TypeError('an integer is required') 1208 1209 try: 1210 return FakeFileOpen(self.filesystem).Call(file_des, mode=mode) 1211 except IOError as e: 1212 raise OSError(e) 1213 1214 def open(self, file_path, flags, mode=None): 1215 """Returns the file descriptor for a FakeFile. 1216 1217 WARNING: This implementation only implements creating a file. Please fill 1218 out the remainder for your needs. 1219 1220 Args: 1221 file_path: the path to the file 1222 flags: low-level bits to indicate io operation 1223 mode: bits to define default permissions 1224 1225 Returns: 1226 A file descriptor. 1227 1228 Raises: 1229 OSError: if the path cannot be found 1230 ValueError: if invalid mode is given 1231 NotImplementedError: if an unsupported flag is passed in 1232 """ 1233 if flags & os.O_CREAT: 1234 fake_file = FakeFileOpen(self.filesystem)(file_path, 'w') 1235 if mode: 1236 self.chmod(file_path, mode) 1237 return fake_file.fileno() 1238 else: 1239 raise NotImplementedError('FakeOsModule.open') 1240 1241 def close(self, file_des): 1242 """Closes a file descriptor. 1243 1244 Args: 1245 file_des: An integer file descriptor for the file object requested. 1246 1247 Raises: 1248 OSError: bad file descriptor. 1249 TypeError: if file descriptor is not an integer. 1250 """ 1251 fh = self.filesystem.GetOpenFile(file_des) 1252 fh.close() 1253 1254 def read(self, file_des, num_bytes): 1255 """Reads number of bytes from a file descriptor, returns bytes read. 1256 1257 Args: 1258 file_des: An integer file descriptor for the file object requested. 1259 num_bytes: Number of bytes to read from file. 1260 1261 Returns: 1262 Bytes read from file. 1263 1264 Raises: 1265 OSError: bad file descriptor. 1266 TypeError: if file descriptor is not an integer. 1267 """ 1268 fh = self.filesystem.GetOpenFile(file_des) 1269 return fh.read(num_bytes) 1270 1271 def write(self, file_des, contents): 1272 """Writes string to file descriptor, returns number of bytes written. 1273 1274 Args: 1275 file_des: An integer file descriptor for the file object requested. 1276 contents: String of bytes to write to file. 1277 1278 Returns: 1279 Number of bytes written. 1280 1281 Raises: 1282 OSError: bad file descriptor. 1283 TypeError: if file descriptor is not an integer. 1284 """ 1285 fh = self.filesystem.GetOpenFile(file_des) 1286 fh.write(contents) 1287 fh.flush() 1288 return len(contents) 1289 1290 def fstat(self, file_des): 1291 """Returns the os.stat-like tuple for the FakeFile object of file_des. 1292 1293 Args: 1294 file_des: file descriptor of filesystem object to retrieve 1295 1296 Returns: 1297 the os.stat_result object corresponding to entry_path 1298 1299 Raises: 1300 OSError: if the filesystem object doesn't exist. 1301 """ 1302 # stat should return the tuple representing return value of os.stat 1303 stats = self.filesystem.GetOpenFile(file_des).GetObject() 1304 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, 1305 stats.st_nlink, stats.st_uid, stats.st_gid, 1306 stats.st_size, stats.st_atime, 1307 stats.st_mtime, stats.st_ctime)) 1308 return st_obj 1309 1310 def _ConfirmDir(self, target_directory): 1311 """Tests that the target is actually a directory, raising OSError if not. 1312 1313 Args: 1314 target_directory: path to the target directory within the fake 1315 filesystem 1316 1317 Returns: 1318 the FakeFile object corresponding to target_directory 1319 1320 Raises: 1321 OSError: if the target is not a directory 1322 """ 1323 try: 1324 directory = self.filesystem.GetObject(target_directory) 1325 except IOError as e: 1326 raise OSError(e.errno, e.strerror, target_directory) 1327 if not directory.st_mode & stat.S_IFDIR: 1328 raise OSError(errno.ENOTDIR, 1329 'Fake os module: not a directory', 1330 target_directory) 1331 return directory 1332 1333 def umask(self, new_mask): 1334 """Change the current umask. 1335 1336 Args: 1337 new_mask: An integer. 1338 1339 Returns: 1340 The old mask. 1341 1342 Raises: 1343 TypeError: new_mask is of an invalid type. 1344 """ 1345 if not isinstance(new_mask, int): 1346 raise TypeError('an integer is required') 1347 old_umask = self.filesystem.umask 1348 self.filesystem.umask = new_mask 1349 return old_umask 1350 1351 def chdir(self, target_directory): 1352 """Change current working directory to target directory. 1353 1354 Args: 1355 target_directory: path to new current working directory 1356 1357 Raises: 1358 OSError: if user lacks permission to enter the argument directory or if 1359 the target is not a directory 1360 """ 1361 target_directory = self.filesystem.ResolvePath(target_directory) 1362 self._ConfirmDir(target_directory) 1363 directory = self.filesystem.GetObject(target_directory) 1364 # A full implementation would check permissions all the way up the tree. 1365 if not directory.st_mode | PERM_EXE: 1366 raise OSError(errno.EACCES, 'Fake os module: permission denied', 1367 directory) 1368 self.filesystem.cwd = target_directory 1369 1370 def getcwd(self): 1371 """Return current working directory.""" 1372 return self.filesystem.cwd 1373 1374 def getcwdu(self): 1375 """Return current working directory. Deprecated in Python 3.""" 1376 if sys.version_info >= (3, 0): 1377 raise AttributeError('no attribute getcwdu') 1378 return unicode(self.filesystem.cwd) 1379 1380 def listdir(self, target_directory): 1381 """Returns a sorted list of filenames in target_directory. 1382 1383 Args: 1384 target_directory: path to the target directory within the fake 1385 filesystem 1386 1387 Returns: 1388 a sorted list of file names within the target directory 1389 1390 Raises: 1391 OSError: if the target is not a directory 1392 """ 1393 target_directory = self.filesystem.ResolvePath(target_directory) 1394 directory = self._ConfirmDir(target_directory) 1395 return sorted(directory.contents) 1396 1397 def _ClassifyDirectoryContents(self, root): 1398 """Classify contents of a directory as files/directories. 1399 1400 Args: 1401 root: (str) Directory to examine. 1402 1403 Returns: 1404 (tuple) A tuple consisting of three values: the directory examined, a 1405 list containing all of the directory entries, and a list containing all 1406 of the non-directory entries. (This is the same format as returned by 1407 the os.walk generator.) 1408 1409 Raises: 1410 Nothing on its own, but be ready to catch exceptions generated by 1411 underlying mechanisms like os.listdir. 1412 """ 1413 dirs = [] 1414 files = [] 1415 for entry in self.listdir(root): 1416 if self.path.isdir(self.path.join(root, entry)): 1417 dirs.append(entry) 1418 else: 1419 files.append(entry) 1420 return (root, dirs, files) 1421 1422 def walk(self, top, topdown=True, onerror=None): 1423 """Performs an os.walk operation over the fake filesystem. 1424 1425 Args: 1426 top: root directory from which to begin walk 1427 topdown: determines whether to return the tuples with the root as the 1428 first entry (True) or as the last, after all the child directory 1429 tuples (False) 1430 onerror: if not None, function which will be called to handle the 1431 os.error instance provided when os.listdir() fails 1432 1433 Yields: 1434 (path, directories, nondirectories) for top and each of its 1435 subdirectories. See the documentation for the builtin os module for 1436 further details. 1437 """ 1438 top = self.path.normpath(top) 1439 try: 1440 top_contents = self._ClassifyDirectoryContents(top) 1441 except OSError as e: 1442 top_contents = None 1443 if onerror is not None: 1444 onerror(e) 1445 1446 if top_contents is not None: 1447 if topdown: 1448 yield top_contents 1449 1450 for directory in top_contents[1]: 1451 for contents in self.walk(self.path.join(top, directory), 1452 topdown=topdown, onerror=onerror): 1453 yield contents 1454 1455 if not topdown: 1456 yield top_contents 1457 1458 def readlink(self, path): 1459 """Reads the target of a symlink. 1460 1461 Args: 1462 path: symlink to read the target of 1463 1464 Returns: 1465 the string representing the path to which the symbolic link points. 1466 1467 Raises: 1468 TypeError: if path is None 1469 OSError: (with errno=ENOENT) if path is not a valid path, or 1470 (with errno=EINVAL) if path is valid, but is not a symlink 1471 """ 1472 if path is None: 1473 raise TypeError 1474 try: 1475 link_obj = self.filesystem.LResolveObject(path) 1476 except IOError: 1477 raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path) 1478 if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK: 1479 raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path) 1480 return link_obj.contents 1481 1482 def stat(self, entry_path): 1483 """Returns the os.stat-like tuple for the FakeFile object of entry_path. 1484 1485 Args: 1486 entry_path: path to filesystem object to retrieve 1487 1488 Returns: 1489 the os.stat_result object corresponding to entry_path 1490 1491 Raises: 1492 OSError: if the filesystem object doesn't exist. 1493 """ 1494 # stat should return the tuple representing return value of os.stat 1495 try: 1496 stats = self.filesystem.ResolveObject(entry_path) 1497 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, 1498 stats.st_nlink, stats.st_uid, stats.st_gid, 1499 stats.st_size, stats.st_atime, 1500 stats.st_mtime, stats.st_ctime)) 1501 return st_obj 1502 except IOError as io_error: 1503 raise OSError(io_error.errno, io_error.strerror, entry_path) 1504 1505 def lstat(self, entry_path): 1506 """Returns the os.stat-like tuple for entry_path, not following symlinks. 1507 1508 Args: 1509 entry_path: path to filesystem object to retrieve 1510 1511 Returns: 1512 the os.stat_result object corresponding to entry_path 1513 1514 Raises: 1515 OSError: if the filesystem object doesn't exist. 1516 """ 1517 # stat should return the tuple representing return value of os.stat 1518 try: 1519 stats = self.filesystem.LResolveObject(entry_path) 1520 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, 1521 stats.st_nlink, stats.st_uid, stats.st_gid, 1522 stats.st_size, stats.st_atime, 1523 stats.st_mtime, stats.st_ctime)) 1524 return st_obj 1525 except IOError as io_error: 1526 raise OSError(io_error.errno, io_error.strerror, entry_path) 1527 1528 def remove(self, path): 1529 """Removes the FakeFile object representing the specified file.""" 1530 path = self.filesystem.NormalizePath(path) 1531 if self.path.isdir(path) and not self.path.islink(path): 1532 raise OSError(errno.EISDIR, "Is a directory: '%s'" % path) 1533 try: 1534 self.filesystem.RemoveObject(path) 1535 except IOError as e: 1536 raise OSError(e.errno, e.strerror, e.filename) 1537 1538 # As per the documentation unlink = remove. 1539 unlink = remove 1540 1541 def rename(self, old_file, new_file): 1542 """Adds a FakeFile object at new_file containing contents of old_file. 1543 1544 Also removes the FakeFile object for old_file, and replaces existing 1545 new_file object, if one existed. 1546 1547 Args: 1548 old_file: path to filesystem object to rename 1549 new_file: path to where the filesystem object will live after this call 1550 1551 Raises: 1552 OSError: if old_file does not exist. 1553 IOError: if dirname(new_file) does not exist 1554 """ 1555 old_file = self.filesystem.NormalizePath(old_file) 1556 new_file = self.filesystem.NormalizePath(new_file) 1557 if not self.filesystem.Exists(old_file): 1558 raise OSError(errno.ENOENT, 1559 'Fake os object: can not rename nonexistent file ' 1560 'with name', 1561 old_file) 1562 if self.filesystem.Exists(new_file): 1563 if old_file == new_file: 1564 return None # Nothing to do here. 1565 else: 1566 self.remove(new_file) 1567 old_dir, old_name = self.path.split(old_file) 1568 new_dir, new_name = self.path.split(new_file) 1569 if not self.filesystem.Exists(new_dir): 1570 raise IOError(errno.ENOENT, 'No such fake directory', new_dir) 1571 old_dir_object = self.filesystem.ResolveObject(old_dir) 1572 old_object = old_dir_object.GetEntry(old_name) 1573 old_object_mtime = old_object.st_mtime 1574 new_dir_object = self.filesystem.ResolveObject(new_dir) 1575 if old_object.st_mode & stat.S_IFDIR: 1576 old_object.name = new_name 1577 new_dir_object.AddEntry(old_object) 1578 old_dir_object.RemoveEntry(old_name) 1579 else: 1580 self.filesystem.CreateFile(new_file, 1581 st_mode=old_object.st_mode, 1582 contents=old_object.contents, 1583 create_missing_dirs=False) 1584 self.remove(old_file) 1585 new_object = self.filesystem.GetObject(new_file) 1586 new_object.SetMTime(old_object_mtime) 1587 self.chown(new_file, old_object.st_uid, old_object.st_gid) 1588 1589 def rmdir(self, target_directory): 1590 """Remove a leaf Fake directory. 1591 1592 Args: 1593 target_directory: (str) Name of directory to remove. 1594 1595 Raises: 1596 OSError: if target_directory does not exist or is not a directory, 1597 or as per FakeFilesystem.RemoveObject. Cannot remove '.'. 1598 """ 1599 if target_directory == '.': 1600 raise OSError(errno.EINVAL, 'Invalid argument: \'.\'') 1601 target_directory = self.filesystem.NormalizePath(target_directory) 1602 if self._ConfirmDir(target_directory): 1603 if self.listdir(target_directory): 1604 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', 1605 target_directory) 1606 try: 1607 self.filesystem.RemoveObject(target_directory) 1608 except IOError as e: 1609 raise OSError(e.errno, e.strerror, e.filename) 1610 1611 def removedirs(self, target_directory): 1612 """Remove a leaf Fake directory and all empty intermediate ones.""" 1613 target_directory = self.filesystem.NormalizePath(target_directory) 1614 directory = self._ConfirmDir(target_directory) 1615 if directory.contents: 1616 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', 1617 self.path.basename(target_directory)) 1618 else: 1619 self.rmdir(target_directory) 1620 head, tail = self.path.split(target_directory) 1621 if not tail: 1622 head, tail = self.path.split(head) 1623 while head and tail: 1624 head_dir = self._ConfirmDir(head) 1625 if head_dir.contents: 1626 break 1627 self.rmdir(head) 1628 head, tail = self.path.split(head) 1629 1630 def mkdir(self, dir_name, mode=PERM_DEF): 1631 """Create a leaf Fake directory. 1632 1633 Args: 1634 dir_name: (str) Name of directory to create. Relative paths are assumed 1635 to be relative to '/'. 1636 mode: (int) Mode to create directory with. This argument defaults to 1637 0o777. The umask is applied to this mode. 1638 1639 Raises: 1640 OSError: if the directory name is invalid or parent directory is read only 1641 or as per FakeFilesystem.AddObject. 1642 """ 1643 if dir_name.endswith(self.sep): 1644 dir_name = dir_name[:-1] 1645 1646 parent_dir, _ = self.path.split(dir_name) 1647 if parent_dir: 1648 base_dir = self.path.normpath(parent_dir) 1649 if parent_dir.endswith(self.sep + '..'): 1650 base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..') 1651 if not self.filesystem.Exists(base_dir): 1652 raise OSError(errno.ENOENT, 'No such fake directory', base_dir) 1653 1654 dir_name = self.filesystem.NormalizePath(dir_name) 1655 if self.filesystem.Exists(dir_name): 1656 raise OSError(errno.EEXIST, 'Fake object already exists', dir_name) 1657 head, tail = self.path.split(dir_name) 1658 directory_object = self.filesystem.GetObject(head) 1659 if not directory_object.st_mode & PERM_WRITE: 1660 raise OSError(errno.EACCES, 'Permission Denied', dir_name) 1661 1662 self.filesystem.AddObject( 1663 head, FakeDirectory(tail, mode & ~self.filesystem.umask)) 1664 1665 def makedirs(self, dir_name, mode=PERM_DEF): 1666 """Create a leaf Fake directory + create any non-existent parent dirs. 1667 1668 Args: 1669 dir_name: (str) Name of directory to create. 1670 mode: (int) Mode to create directory (and any necessary parent 1671 directories) with. This argument defaults to 0o777. The umask is 1672 applied to this mode. 1673 1674 Raises: 1675 OSError: if the directory already exists or as per 1676 FakeFilesystem.CreateDirectory 1677 """ 1678 dir_name = self.filesystem.NormalizePath(dir_name) 1679 path_components = self.filesystem.GetPathComponents(dir_name) 1680 1681 # Raise a permission denied error if the first existing directory is not 1682 # writeable. 1683 current_dir = self.filesystem.root 1684 for component in path_components: 1685 if component not in current_dir.contents: 1686 if not current_dir.st_mode & PERM_WRITE: 1687 raise OSError(errno.EACCES, 'Permission Denied', dir_name) 1688 else: 1689 break 1690 else: 1691 current_dir = current_dir.contents[component] 1692 1693 self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask) 1694 1695 def access(self, path, mode): 1696 """Check if a file exists and has the specified permissions. 1697 1698 Args: 1699 path: (str) Path to the file. 1700 mode: (int) Permissions represented as a bitwise-OR combination of 1701 os.F_OK, os.R_OK, os.W_OK, and os.X_OK. 1702 Returns: 1703 boolean, True if file is accessible, False otherwise 1704 """ 1705 try: 1706 st = self.stat(path) 1707 except OSError as os_error: 1708 if os_error.errno == errno.ENOENT: 1709 return False 1710 raise 1711 return (mode & ((st.st_mode >> 6) & 7)) == mode 1712 1713 def chmod(self, path, mode): 1714 """Change the permissions of a file as encoded in integer mode. 1715 1716 Args: 1717 path: (str) Path to the file. 1718 mode: (int) Permissions 1719 """ 1720 try: 1721 file_object = self.filesystem.GetObject(path) 1722 except IOError as io_error: 1723 if io_error.errno == errno.ENOENT: 1724 raise OSError(errno.ENOENT, 1725 'No such file or directory in fake filesystem', 1726 path) 1727 raise 1728 file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | 1729 (mode & PERM_ALL)) 1730 file_object.st_ctime = int(time.time()) 1731 1732 def utime(self, path, times): 1733 """Change the access and modified times of a file. 1734 1735 Args: 1736 path: (str) Path to the file. 1737 times: 2-tuple of numbers, of the form (atime, mtime) which is used to set 1738 the access and modified times, respectively. If None, file's access 1739 and modified times are set to the current time. 1740 1741 Raises: 1742 TypeError: If anything other than integers is specified in passed tuple or 1743 number of elements in the tuple is not equal to 2. 1744 """ 1745 try: 1746 file_object = self.filesystem.GetObject(path) 1747 except IOError as io_error: 1748 if io_error.errno == errno.ENOENT: 1749 raise OSError(errno.ENOENT, 1750 'No such file or directory in fake filesystem', 1751 path) 1752 raise 1753 if times is None: 1754 file_object.st_atime = int(time.time()) 1755 file_object.st_mtime = int(time.time()) 1756 else: 1757 if len(times) != 2: 1758 raise TypeError('utime() arg 2 must be a tuple (atime, mtime)') 1759 for t in times: 1760 if not isinstance(t, (int, float)): 1761 raise TypeError('an integer is required') 1762 1763 file_object.st_atime = times[0] 1764 file_object.st_mtime = times[1] 1765 1766 def chown(self, path, uid, gid): 1767 """Set ownership of a faked file. 1768 1769 Args: 1770 path: (str) Path to the file or directory. 1771 uid: (int) Numeric uid to set the file or directory to. 1772 gid: (int) Numeric gid to set the file or directory to. 1773 """ 1774 try: 1775 file_object = self.filesystem.GetObject(path) 1776 except IOError as io_error: 1777 if io_error.errno == errno.ENOENT: 1778 raise OSError(errno.ENOENT, 1779 'No such file or directory in fake filesystem', 1780 path) 1781 raise 1782 if uid != -1: 1783 file_object.st_uid = uid 1784 if gid != -1: 1785 file_object.st_gid = gid 1786 1787 def mknod(self, filename, mode=None, device=None): 1788 """Create a filesystem node named 'filename'. 1789 1790 Does not support device special files or named pipes as the real os 1791 module does. 1792 1793 Args: 1794 filename: (str) Name of the file to create 1795 mode: (int) permissions to use and type of file to be created. 1796 Default permissions are 0o666. Only the stat.S_IFREG file type 1797 is supported by the fake implementation. The umask is applied 1798 to this mode. 1799 device: not supported in fake implementation 1800 1801 Raises: 1802 OSError: if called with unsupported options or the file can not be 1803 created. 1804 """ 1805 if mode is None: 1806 mode = stat.S_IFREG | PERM_DEF_FILE 1807 if device or not mode & stat.S_IFREG: 1808 raise OSError(errno.EINVAL, 1809 'Fake os mknod implementation only supports ' 1810 'regular files.') 1811 1812 head, tail = self.path.split(filename) 1813 if not tail: 1814 if self.filesystem.Exists(head): 1815 raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % ( 1816 os.strerror(errno.EEXIST), filename)) 1817 raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % ( 1818 os.strerror(errno.ENOENT), filename)) 1819 if tail == '.' or tail == '..' or self.filesystem.Exists(filename): 1820 raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % ( 1821 os.strerror(errno.EEXIST), filename)) 1822 try: 1823 self.filesystem.AddObject(head, FakeFile(tail, 1824 mode & ~self.filesystem.umask)) 1825 except IOError: 1826 raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % ( 1827 os.strerror(errno.ENOTDIR), filename)) 1828 1829 def symlink(self, link_target, path): 1830 """Creates the specified symlink, pointed at the specified link target. 1831 1832 Args: 1833 link_target: the target of the symlink 1834 path: path to the symlink to create 1835 1836 Returns: 1837 None 1838 1839 Raises: 1840 IOError: if the file already exists 1841 """ 1842 self.filesystem.CreateLink(path, link_target) 1843 1844 # pylint: disable-msg=C6002 1845 # TODO: Link doesn't behave like os.link, this needs to be fixed properly. 1846 link = symlink 1847 1848 def __getattr__(self, name): 1849 """Forwards any unfaked calls to the standard os module.""" 1850 return getattr(self._os_module, name) 1851 1852 1853class FakeFileOpen(object): 1854 """Faked file() and open() function replacements. 1855 1856 Returns FakeFile objects in a FakeFilesystem in place of the file() 1857 or open() function. 1858 """ 1859 1860 def __init__(self, filesystem, delete_on_close=False): 1861 """init. 1862 1863 Args: 1864 filesystem: FakeFilesystem used to provide file system information 1865 delete_on_close: optional boolean, deletes file on close() 1866 """ 1867 self.filesystem = filesystem 1868 self._delete_on_close = delete_on_close 1869 1870 def __call__(self, *args, **kwargs): 1871 """Redirects calls to file() or open() to appropriate method.""" 1872 if sys.version_info < (3, 0): 1873 return self._call_ver2(*args, **kwargs) 1874 else: 1875 return self.Call(*args, **kwargs) 1876 1877 def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None): 1878 """Limits args of open() or file() for Python 2.x versions.""" 1879 # Backwards compatibility, mode arg used to be named flags 1880 mode = flags or mode 1881 return self.Call(file_path, mode, buffering) 1882 1883 def Call(self, file_, mode='r', buffering=-1, encoding=None, 1884 errors=None, newline=None, closefd=True, opener=None): 1885 """Returns a StringIO object with the contents of the target file object. 1886 1887 Args: 1888 file_: path to target file or a file descriptor 1889 mode: additional file modes. All r/w/a r+/w+/a+ modes are supported. 1890 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets 1891 binary mode, no end of line translations in StringIO. 1892 buffering: ignored. (Used for signature compliance with __builtin__.open) 1893 encoding: ignored, strings have no encoding 1894 errors: ignored, this relates to encoding 1895 newline: controls universal newlines, passed to StringIO object 1896 closefd: if a file descriptor rather than file name is passed, and set 1897 to false, then the file descriptor is kept open when file is closed 1898 opener: not supported 1899 1900 Returns: 1901 a StringIO object containing the contents of the target file 1902 1903 Raises: 1904 IOError: if the target object is a directory, the path is invalid or 1905 permission is denied. 1906 """ 1907 orig_modes = mode # Save original mdoes for error messages. 1908 # Binary mode for non 3.x or set by mode 1909 binary = sys.version_info < (3, 0) or 'b' in mode 1910 # Normalize modes. Ignore 't' and 'U'. 1911 mode = mode.replace('t', '').replace('b', '') 1912 mode = mode.replace('rU', 'r').replace('U', 'r') 1913 1914 if mode not in _OPEN_MODE_MAP: 1915 raise IOError('Invalid mode: %r' % orig_modes) 1916 1917 must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode] 1918 1919 file_object = None 1920 filedes = None 1921 # opening a file descriptor 1922 if isinstance(file_, int): 1923 filedes = file_ 1924 file_object = self.filesystem.GetOpenFile(filedes).GetObject() 1925 file_path = file_object.name 1926 else: 1927 file_path = file_ 1928 real_path = self.filesystem.ResolvePath(file_path) 1929 if self.filesystem.Exists(file_path): 1930 file_object = self.filesystem.GetObjectFromNormalizedPath(real_path) 1931 closefd = True 1932 1933 if file_object: 1934 if ((need_read and not file_object.st_mode & PERM_READ) or 1935 (need_write and not file_object.st_mode & PERM_WRITE)): 1936 raise IOError(errno.EACCES, 'Permission denied', file_path) 1937 if need_write: 1938 file_object.st_ctime = int(time.time()) 1939 if truncate: 1940 file_object.SetContents('') 1941 else: 1942 if must_exist: 1943 raise IOError(errno.ENOENT, 'No such file or directory', file_path) 1944 file_object = self.filesystem.CreateFile( 1945 real_path, create_missing_dirs=False, apply_umask=True) 1946 1947 if file_object.st_mode & stat.S_IFDIR: 1948 raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path) 1949 1950 class FakeFileWrapper(object): 1951 """Wrapper for a StringIO object for use by a FakeFile object. 1952 1953 If the wrapper has any data written to it, it will propagate to 1954 the FakeFile object on close() or flush(). 1955 """ 1956 if sys.version_info < (3, 0): 1957 _OPERATION_ERROR = IOError 1958 else: 1959 _OPERATION_ERROR = io.UnsupportedOperation 1960 1961 def __init__(self, file_object, update=False, read=False, append=False, 1962 delete_on_close=False, filesystem=None, newline=None, 1963 binary=True, closefd=True): 1964 self._file_object = file_object 1965 self._append = append 1966 self._read = read 1967 self._update = update 1968 self._closefd = closefd 1969 self._file_epoch = file_object.epoch 1970 contents = file_object.contents 1971 newline_arg = {} if binary else {'newline': newline} 1972 io_class = io.StringIO 1973 # For Python 3, files opened as binary only read/write byte contents. 1974 if sys.version_info >= (3, 0) and binary: 1975 io_class = io.BytesIO 1976 if contents and isinstance(contents, str): 1977 contents = bytes(contents, 'ascii') 1978 if contents: 1979 if update: 1980 self._io = io_class(**newline_arg) 1981 self._io.write(contents) 1982 if not append: 1983 self._io.seek(0) 1984 else: 1985 self._read_whence = 0 1986 if read: 1987 self._read_seek = 0 1988 else: 1989 self._read_seek = self._io.tell() 1990 else: 1991 self._io = io_class(contents, **newline_arg) 1992 else: 1993 self._io = io_class(**newline_arg) 1994 self._read_whence = 0 1995 self._read_seek = 0 1996 if delete_on_close: 1997 assert filesystem, 'delete_on_close=True requires filesystem=' 1998 self._filesystem = filesystem 1999 self._delete_on_close = delete_on_close 2000 # override, don't modify FakeFile.name, as FakeFilesystem expects 2001 # it to be the file name only, no directories. 2002 self.name = file_object.opened_as 2003 2004 def __enter__(self): 2005 """To support usage of this fake file with the 'with' statement.""" 2006 return self 2007 2008 def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622 2009 """To support usage of this fake file with the 'with' statement.""" 2010 self.close() 2011 2012 def GetObject(self): 2013 """Returns FakeFile object that is wrapped by current class.""" 2014 return self._file_object 2015 2016 def fileno(self): 2017 """Returns file descriptor of file object.""" 2018 return self.filedes 2019 2020 def close(self): 2021 """File close.""" 2022 if self._update: 2023 self._file_object.SetContents(self._io.getvalue()) 2024 if self._closefd: 2025 self._filesystem.CloseOpenFile(self) 2026 if self._delete_on_close: 2027 self._filesystem.RemoveObject(self.name) 2028 2029 def flush(self): 2030 """Flush file contents to 'disk'.""" 2031 if self._update: 2032 self._file_object.SetContents(self._io.getvalue()) 2033 self._file_epoch = self._file_object.epoch 2034 2035 def seek(self, offset, whence=0): 2036 """Move read/write pointer in 'file'.""" 2037 if not self._append: 2038 self._io.seek(offset, whence) 2039 else: 2040 self._read_seek = offset 2041 self._read_whence = whence 2042 2043 def tell(self): 2044 """Return the file's current position. 2045 2046 Returns: 2047 int, file's current position in bytes. 2048 """ 2049 if not self._append: 2050 return self._io.tell() 2051 if self._read_whence: 2052 write_seek = self._io.tell() 2053 self._io.seek(self._read_seek, self._read_whence) 2054 self._read_seek = self._io.tell() 2055 self._read_whence = 0 2056 self._io.seek(write_seek) 2057 return self._read_seek 2058 2059 def _UpdateStringIO(self): 2060 """Updates the StringIO with changes to the file object contents.""" 2061 if self._file_epoch == self._file_object.epoch: 2062 return 2063 whence = self._io.tell() 2064 self._io.seek(0) 2065 self._io.truncate() 2066 self._io.write(self._file_object.contents) 2067 self._io.seek(whence) 2068 self._file_epoch = self._file_object.epoch 2069 2070 def _ReadWrappers(self, name): 2071 """Wrap a StringIO attribute in a read wrapper. 2072 2073 Returns a read_wrapper which tracks our own read pointer since the 2074 StringIO object has no concept of a different read and write pointer. 2075 2076 Args: 2077 name: the name StringIO attribute to wrap. Should be a read call. 2078 2079 Returns: 2080 either a read_error or read_wrapper function. 2081 """ 2082 io_attr = getattr(self._io, name) 2083 2084 def read_wrapper(*args, **kwargs): 2085 """Wrap all read calls to the StringIO Object. 2086 2087 We do this to track the read pointer separate from the write 2088 pointer. Anything that wants to read from the StringIO object 2089 while we're in append mode goes through this. 2090 2091 Args: 2092 *args: pass through args 2093 **kwargs: pass through kwargs 2094 Returns: 2095 Wrapped StringIO object method 2096 """ 2097 self._io.seek(self._read_seek, self._read_whence) 2098 ret_value = io_attr(*args, **kwargs) 2099 self._read_seek = self._io.tell() 2100 self._read_whence = 0 2101 self._io.seek(0, 2) 2102 return ret_value 2103 return read_wrapper 2104 2105 def _OtherWrapper(self, name): 2106 """Wrap a StringIO attribute in an other_wrapper. 2107 2108 Args: 2109 name: the name of the StringIO attribute to wrap. 2110 2111 Returns: 2112 other_wrapper which is described below. 2113 """ 2114 io_attr = getattr(self._io, name) 2115 2116 def other_wrapper(*args, **kwargs): 2117 """Wrap all other calls to the StringIO Object. 2118 2119 We do this to track changes to the write pointer. Anything that 2120 moves the write pointer in a file open for appending should move 2121 the read pointer as well. 2122 2123 Args: 2124 *args: pass through args 2125 **kwargs: pass through kwargs 2126 Returns: 2127 Wrapped StringIO object method 2128 """ 2129 write_seek = self._io.tell() 2130 ret_value = io_attr(*args, **kwargs) 2131 if write_seek != self._io.tell(): 2132 self._read_seek = self._io.tell() 2133 self._read_whence = 0 2134 self._file_object.st_size += (self._read_seek - write_seek) 2135 return ret_value 2136 return other_wrapper 2137 2138 def Size(self): 2139 return self._file_object.st_size 2140 2141 def __getattr__(self, name): 2142 if self._file_object.IsLargeFile(): 2143 raise FakeLargeFileIoException(file_path) 2144 2145 # errors on called method vs. open mode 2146 if not self._read and name.startswith('read'): 2147 def read_error(*args, **kwargs): 2148 """Throw an error unless the argument is zero.""" 2149 if args and args[0] == 0: 2150 return '' 2151 raise self._OPERATION_ERROR('File is not open for reading.') 2152 return read_error 2153 if not self._update and (name.startswith('write') 2154 or name == 'truncate'): 2155 def write_error(*args, **kwargs): 2156 """Throw an error.""" 2157 raise self._OPERATION_ERROR('File is not open for writing.') 2158 return write_error 2159 2160 if name.startswith('read'): 2161 self._UpdateStringIO() 2162 if self._append: 2163 if name.startswith('read'): 2164 return self._ReadWrappers(name) 2165 else: 2166 return self._OtherWrapper(name) 2167 return getattr(self._io, name) 2168 2169 def __iter__(self): 2170 if not self._read: 2171 raise self._OPERATION_ERROR('File is not open for reading') 2172 return self._io.__iter__() 2173 2174 # if you print obj.name, the argument to open() must be printed. Not the 2175 # abspath, not the filename, but the actual argument. 2176 file_object.opened_as = file_path 2177 2178 fakefile = FakeFileWrapper(file_object, 2179 update=need_write, 2180 read=need_read, 2181 append=append, 2182 delete_on_close=self._delete_on_close, 2183 filesystem=self.filesystem, 2184 newline=newline, 2185 binary=binary, 2186 closefd=closefd) 2187 if filedes is not None: 2188 fakefile.filedes = filedes 2189 else: 2190 fakefile.filedes = self.filesystem.AddOpenFile(fakefile) 2191 return fakefile 2192 2193 2194def _RunDoctest(): 2195 # pylint: disable-msg=C6204 2196 import doctest 2197 import fake_filesystem # pylint: disable-msg=W0406 2198 return doctest.testmod(fake_filesystem) 2199 2200 2201if __name__ == '__main__': 2202 _RunDoctest() 2203