filesystem_mock.py revision 2daae5fd11344eaa88a0d92b0f6d65f8d2255c00
1# Copyright (C) 2009 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#    * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#    * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#    * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import errno
30import os
31import re
32
33from webkitpy.common.system import path
34from webkitpy.common.system import ospath
35
36
37class MockFileSystem(object):
38    def __init__(self, files=None, cwd='/'):
39        """Initializes a "mock" filesystem that can be used to completely
40        stub out a filesystem.
41
42        Args:
43            files: a dict of filenames -> file contents. A file contents
44                value of None is used to indicate that the file should
45                not exist.
46        """
47        self.files = files or {}
48        self.written_files = {}
49        self._sep = '/'
50        self.current_tmpno = 0
51        self.cwd = cwd
52        self.dirs = {}
53
54    def _get_sep(self):
55        return self._sep
56
57    sep = property(_get_sep, doc="pathname separator")
58
59    def _raise_not_found(self, path):
60        raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
61
62    def _split(self, path):
63        return path.rsplit(self.sep, 1)
64
65    def abspath(self, path):
66        if os.path.isabs(path):
67            return self.normpath(path)
68        return self.abspath(self.join(self.cwd, path))
69
70    def basename(self, path):
71        return self._split(path)[1]
72
73    def chdir(self, path):
74        path = self.normpath(path)
75        if not self.isdir(path):
76            raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
77        self.cwd = path
78
79    def copyfile(self, source, destination):
80        if not self.exists(source):
81            self._raise_not_found(source)
82        if self.isdir(source):
83            raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
84        if self.isdir(destination):
85            raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))
86
87        self.files[destination] = self.files[source]
88        self.written_files[destination] = self.files[source]
89
90    def dirname(self, path):
91        return self._split(path)[0]
92
93    def exists(self, path):
94        return self.isfile(path) or self.isdir(path)
95
96    def files_under(self, path, dirs_to_skip=[], file_filter=None):
97        def filter_all(fs, dirpath, basename):
98            return True
99
100        file_filter = file_filter or filter_all
101        files = []
102        if self.isfile(path):
103            if file_filter(self, self.dirname(path), self.basename(path)):
104                files.append(path)
105            return files
106
107        if self.basename(path) in dirs_to_skip:
108            return []
109
110        if not path.endswith(self.sep):
111            path += self.sep
112
113        dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
114        for filename in self.files:
115            if not filename.startswith(path):
116                continue
117
118            suffix = filename[len(path) - 1:]
119            if any(dir_substring in suffix for dir_substring in dir_substrings):
120                continue
121
122            dirpath, basename = self._split(filename)
123            if file_filter(self, dirpath, basename):
124                files.append(filename)
125
126        return files
127
128    def getcwd(self, path):
129        return self.cwd
130
131    def glob(self, path):
132        # FIXME: This only handles a wildcard '*' at the end of the path.
133        # Maybe it should handle more?
134        if path[-1] == '*':
135            return [f for f in self.files if f.startswith(path[:-1])]
136        else:
137            return [f for f in self.files if f == path]
138
139    def isabs(self, path):
140        return path.startswith(self.sep)
141
142    def isfile(self, path):
143        return path in self.files and self.files[path] is not None
144
145    def isdir(self, path):
146        if path in self.files:
147            return False
148        path = self.normpath(path)
149        if path in self.dirs:
150            return True
151
152        # We need to use a copy of the keys here in order to avoid switching
153        # to a different thread and potentially modifying the dict in
154        # mid-iteration.
155        files = self.files.keys()[:]
156        result = any(f.startswith(path) for f in files)
157        if result:
158            self.dirs[path] = True
159        return result
160
161    def join(self, *comps):
162        # FIXME: might want tests for this and/or a better comment about how
163        # it works.
164        return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
165
166    def listdir(self, path):
167        if not self.isdir(path):
168            raise OSError("%s is not a directory" % path)
169
170        if not path.endswith(self.sep):
171            path += self.sep
172
173        dirs = []
174        files = []
175        for f in self.files:
176            if self.exists(f) and f.startswith(path):
177                remaining = f[len(path):]
178                if self.sep in remaining:
179                    dir = remaining[:remaining.index(self.sep)]
180                    if not dir in dirs:
181                        dirs.append(dir)
182                else:
183                    files.append(remaining)
184        return dirs + files
185
186    def mtime(self, path):
187        if self.exists(path):
188            return 0
189        self._raise_not_found(path)
190
191    def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
192        if dir is None:
193            dir = self.sep + '__im_tmp'
194        curno = self.current_tmpno
195        self.current_tmpno += 1
196        return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))
197
198    def mkdtemp(self, **kwargs):
199        class TemporaryDirectory(object):
200            def __init__(self, fs, **kwargs):
201                self._kwargs = kwargs
202                self._filesystem = fs
203                self._directory_path = fs._mktemp(**kwargs)
204                fs.maybe_make_directory(self._directory_path)
205
206            def __str__(self):
207                return self._directory_path
208
209            def __enter__(self):
210                return self._directory_path
211
212            def __exit__(self, type, value, traceback):
213                # Only self-delete if necessary.
214
215                # FIXME: Should we delete non-empty directories?
216                if self._filesystem.exists(self._directory_path):
217                    self._filesystem.rmtree(self._directory_path)
218
219        return TemporaryDirectory(fs=self, **kwargs)
220
221    def maybe_make_directory(self, *path):
222        norm_path = self.normpath(self.join(*path))
223        if not self.isdir(norm_path):
224            self.dirs[norm_path] = True
225
226    def move(self, source, destination):
227        if self.files[source] is None:
228            self._raise_not_found(source)
229        self.files[destination] = self.files[source]
230        self.written_files[destination] = self.files[destination]
231        self.files[source] = None
232        self.written_files[source] = None
233
234    def normpath(self, path):
235        # Like join(), relies on os.path functionality but normalizes the
236        # path separator to the mock one.
237        return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
238
239    def open_binary_tempfile(self, suffix=''):
240        path = self._mktemp(suffix)
241        return (WritableFileObject(self, path), path)
242
243    def open_text_file_for_writing(self, path, append=False):
244        return WritableFileObject(self, path, append)
245
246    def read_text_file(self, path):
247        return self.read_binary_file(path).decode('utf-8')
248
249    def open_binary_file_for_reading(self, path):
250        if self.files[path] is None:
251            self._raise_not_found(path)
252        return ReadableFileObject(self, path, self.files[path])
253
254    def read_binary_file(self, path):
255        # Intentionally raises KeyError if we don't recognize the path.
256        if self.files[path] is None:
257            self._raise_not_found(path)
258        return self.files[path]
259
260    def relpath(self, path, start='.'):
261        return ospath.relpath(path, start, self.abspath, self.sep)
262
263    def remove(self, path):
264        if self.files[path] is None:
265            self._raise_not_found(path)
266        self.files[path] = None
267        self.written_files[path] = None
268
269    def rmtree(self, path):
270        if not path.endswith(self.sep):
271            path += self.sep
272
273        for f in self.files:
274            if f.startswith(path):
275                self.files[f] = None
276
277    def splitext(self, path):
278        idx = path.rfind('.')
279        if idx == -1:
280            idx = 0
281        return (path[0:idx], path[idx:])
282
283    def write_text_file(self, path, contents):
284        return self.write_binary_file(path, contents.encode('utf-8'))
285
286    def write_binary_file(self, path, contents):
287        self.files[path] = contents
288        self.written_files[path] = contents
289
290
291class WritableFileObject(object):
292    def __init__(self, fs, path, append=False, encoding=None):
293        self.fs = fs
294        self.path = path
295        self.closed = False
296        if path not in self.fs.files or not append:
297            self.fs.files[path] = ""
298
299    def __enter__(self):
300        return self
301
302    def __exit__(self, type, value, traceback):
303        self.close()
304
305    def close(self):
306        self.closed = True
307
308    def write(self, str):
309        self.fs.files[self.path] += str
310        self.fs.written_files[self.path] = self.fs.files[self.path]
311
312
313class ReadableFileObject(object):
314    def __init__(self, fs, path, data=""):
315        self.fs = fs
316        self.path = path
317        self.closed = False
318        self.data = data
319        self.offset = 0
320
321    def __enter__(self):
322        return self
323
324    def __exit__(self, type, value, traceback):
325        self.close()
326
327    def close(self):
328        self.closed = True
329
330    def read(self, bytes=None):
331        if not bytes:
332            return self.data[self.offset:]
333        start = self.offset
334        self.offset += bytes
335        return self.data[start:self.offset]
336