1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6import posixpath 7 8from future import Future 9from path_util import AssertIsDirectory, IsDirectory 10 11 12class _Response(object): 13 def __init__(self, content=''): 14 self.content = content 15 self.headers = {'Content-Type': 'none'} 16 self.status_code = 200 17 18 19class FakeUrlFetcher(object): 20 def __init__(self, base_path): 21 self._base_path = base_path 22 # Mock capabilities. Perhaps this class should be MockUrlFetcher. 23 self._sync_count = 0 24 self._async_count = 0 25 self._async_resolve_count = 0 26 27 def _ReadFile(self, filename): 28 # Fake DownloadError, the error that appengine usually raises. 29 class DownloadError(Exception): pass 30 try: 31 with open(os.path.join(self._base_path, filename), 'r') as f: 32 return f.read() 33 except IOError as e: 34 raise DownloadError(e) 35 36 def _ListDir(self, directory): 37 # In some tests, we need to test listing a directory from the HTML returned 38 # from SVN. This reads an HTML file that has the directories HTML. 39 if not os.path.isdir(os.path.join(self._base_path, directory)): 40 return self._ReadFile(directory[:-1]) 41 files = os.listdir(os.path.join(self._base_path, directory)) 42 html = '<html><title>Revision: 00000</title>\n' 43 for filename in files: 44 if filename.startswith('.'): 45 continue 46 if os.path.isdir(os.path.join(self._base_path, directory, filename)): 47 html += '<a>' + filename + '/</a>\n' 48 else: 49 html += '<a>' + filename + '</a>\n' 50 html += '</html>' 51 return html 52 53 def FetchAsync(self, url): 54 self._async_count += 1 55 url = url.rsplit('?', 1)[0] 56 def resolve(): 57 self._async_resolve_count += 1 58 return self._DoFetch(url) 59 return Future(callback=resolve) 60 61 def Fetch(self, url): 62 self._sync_count += 1 63 return self._DoFetch(url) 64 65 def _DoFetch(self, url): 66 url = url.rsplit('?', 1)[0] 67 result = _Response() 68 if IsDirectory(url): 69 result.content = self._ListDir(url) 70 else: 71 result.content = self._ReadFile(url) 72 return result 73 74 def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0): 75 '''Returns a tuple (success, error). Use in tests like: 76 self.assertTrue(*fetcher.CheckAndReset(...)) 77 ''' 78 errors = [] 79 for desc, expected, actual in ( 80 ('sync_count', sync_count, self._sync_count), 81 ('async_count', async_count, self._async_count), 82 ('async_resolve_count', async_resolve_count, 83 self._async_resolve_count)): 84 if actual != expected: 85 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 86 try: 87 return (len(errors) == 0, ', '.join(errors)) 88 finally: 89 self.Reset() 90 91 def Reset(self): 92 self._sync_count = 0 93 self._async_count = 0 94 self._async_resolve_count = 0 95 96 97class FakeURLFSFetcher(object): 98 '''Use a file_system to resolve fake fetches. Mimics the interface of Google 99 Appengine's urlfetch. 100 ''' 101 102 def __init__(self, file_system, base_path): 103 AssertIsDirectory(base_path) 104 self._base_path = base_path 105 self._file_system = file_system 106 107 def FetchAsync(self, url, **kwargs): 108 return Future(value=self.Fetch(url)) 109 110 def Fetch(self, url, **kwargs): 111 return _Response(self._file_system.ReadSingle( 112 posixpath.join(self._base_path, url)).Get()) 113 114 def UpdateFS(self, file_system, base_path=None): 115 '''Replace the underlying FileSystem used to reslove URLs. 116 ''' 117 self._file_system = file_system 118 self._base_path = base_path or self._base_path 119 120 121class MockURLFetcher(object): 122 def __init__(self, fetcher): 123 self._fetcher = fetcher 124 self.Reset() 125 126 def Fetch(self, url, **kwargs): 127 self._fetch_count += 1 128 return self._fetcher.Fetch(url, **kwargs) 129 130 def FetchAsync(self, url, **kwargs): 131 self._fetch_async_count += 1 132 def next(result): 133 self._fetch_resolve_count += 1 134 return result 135 return self._fetcher.FetchAsync(url, **kwargs).Then(next) 136 137 def CheckAndReset(self, 138 fetch_count=0, 139 fetch_async_count=0, 140 fetch_resolve_count=0): 141 errors = [] 142 for desc, expected, actual in ( 143 ('fetch_count', fetch_count, self._fetch_count), 144 ('fetch_async_count', fetch_async_count, self._fetch_async_count), 145 ('fetch_resolve_count', fetch_resolve_count, 146 self._fetch_resolve_count)): 147 if actual != expected: 148 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 149 try: 150 return (len(errors) == 0, ', '.join(errors)) 151 finally: 152 self.Reset() 153 154 def Reset(self): 155 self._fetch_count = 0 156 self._fetch_async_count = 0 157 self._fetch_resolve_count = 0 158