fake_url_fetcher.py revision f2477e01787aa58f445919b809d89e252beef54f
1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6 7from future import Gettable, Future 8from local_file_system import LocalFileSystem 9 10 11class _Response(object): 12 def __init__(self, content=''): 13 self.content = content 14 self.headers = {'Content-Type': 'none'} 15 self.status_code = 200 16 17 18class FakeUrlFetcher(object): 19 def __init__(self, base_path): 20 self._base_path = base_path 21 # Mock capabilities. Perhaps this class should be MockUrlFetcher. 22 self._sync_count = 0 23 self._async_count = 0 24 self._async_resolve_count = 0 25 26 def _ReadFile(self, filename): 27 with open(os.path.join(self._base_path, filename), 'r') as f: 28 return f.read() 29 30 def _ListDir(self, directory): 31 # In some tests, we need to test listing a directory from the HTML returned 32 # from SVN. This reads an HTML file that has the directories HTML. 33 if not os.path.isdir(os.path.join(self._base_path, directory)): 34 return self._ReadFile(directory[:-1]) 35 files = os.listdir(os.path.join(self._base_path, directory)) 36 html = '<html><title>Revision: 00000</title>\n' 37 for filename in files: 38 if filename.startswith('.'): 39 continue 40 if os.path.isdir(os.path.join(self._base_path, directory, filename)): 41 html += '<a>' + filename + '/</a>\n' 42 else: 43 html += '<a>' + filename + '</a>\n' 44 html += '</html>' 45 return html 46 47 def FetchAsync(self, url): 48 self._async_count += 1 49 url = url.rsplit('?', 1)[0] 50 def resolve(): 51 self._async_resolve_count += 1 52 return self._DoFetch(url) 53 return Future(delegate=Gettable(resolve)) 54 55 def Fetch(self, url): 56 self._sync_count += 1 57 return self._DoFetch(url) 58 59 def _DoFetch(self, url): 60 url = url.rsplit('?', 1)[0] 61 result = _Response() 62 if url.endswith('/'): 63 result.content = self._ListDir(url) 64 else: 65 result.content = self._ReadFile(url) 66 return result 67 68 def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0): 69 '''Returns a tuple (success, error). Use in tests like: 70 self.assertTrue(*fetcher.CheckAndReset(...)) 71 ''' 72 errors = [] 73 for desc, expected, actual in ( 74 ('sync_count', sync_count, self._sync_count), 75 ('async_count', async_count, self._async_count), 76 ('async_resolve_count', async_resolve_count, 77 self._async_resolve_count)): 78 if actual != expected: 79 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 80 try: 81 return (len(errors) == 0, ', '.join(errors)) 82 finally: 83 self.Reset() 84 85 def Reset(self): 86 self._sync_count = 0 87 self._async_count = 0 88 self._async_resolve_count = 0 89 90 91class FakeURLFSFetcher(object): 92 '''Use a file_system to resolve fake fetches. Mimics the interface of Google 93 Appengine's urlfetch. 94 ''' 95 96 def __init__(self, file_system, base_path): 97 self._base_path = base_path 98 self._file_system = file_system 99 100 def FetchAsync(self, url, **kwargs): 101 return Future(value=self.Fetch(url)) 102 103 def Fetch(self, url, **kwargs): 104 return _Response(self._file_system.ReadSingle( 105 self._base_path + '/' + url, binary=True).Get()) 106 107 108class MockURLFetcher(object): 109 def __init__(self, fetcher): 110 self._fetcher = fetcher 111 self.Reset() 112 113 def Fetch(self, url, **kwargs): 114 self._fetch_count += 1 115 return self._fetcher.Fetch(url, **kwargs) 116 117 def FetchAsync(self, url, **kwargs): 118 self._fetch_async_count += 1 119 future = self._fetcher.FetchAsync(url, **kwargs) 120 def resolve(): 121 self._fetch_resolve_count += 1 122 return future.Get() 123 return Future(delegate=Gettable(resolve)) 124 125 def CheckAndReset(self, 126 fetch_count=0, 127 fetch_async_count=0, 128 fetch_resolve_count=0): 129 errors = [] 130 for desc, expected, actual in ( 131 ('fetch_count', fetch_count, self._fetch_count), 132 ('fetch_async_count', fetch_async_count, self._fetch_async_count), 133 ('fetch_resolve_count', fetch_resolve_count, 134 self._fetch_resolve_count)): 135 if actual != expected: 136 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 137 try: 138 return (len(errors) == 0, ', '.join(errors)) 139 finally: 140 self.Reset() 141 142 def Reset(self): 143 self._fetch_count = 0 144 self._fetch_async_count = 0 145 self._fetch_resolve_count = 0 146