1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6 7from future import Gettable, Future 8 9 10class _Response(object): 11 def __init__(self, content=''): 12 self.content = content 13 self.headers = {'Content-Type': 'none'} 14 self.status_code = 200 15 16 17class FakeUrlFetcher(object): 18 def __init__(self, base_path): 19 self._base_path = base_path 20 # Mock capabilities. Perhaps this class should be MockUrlFetcher. 21 self._sync_count = 0 22 self._async_count = 0 23 self._async_resolve_count = 0 24 25 def _ReadFile(self, filename): 26 with open(os.path.join(self._base_path, filename), 'r') as f: 27 return f.read() 28 29 def _ListDir(self, directory): 30 # In some tests, we need to test listing a directory from the HTML returned 31 # from SVN. This reads an HTML file that has the directories HTML. 32 if not os.path.isdir(os.path.join(self._base_path, directory)): 33 return self._ReadFile(directory[:-1]) 34 files = os.listdir(os.path.join(self._base_path, directory)) 35 html = '<html><title>Revision: 00000</title>\n' 36 for filename in files: 37 if filename.startswith('.'): 38 continue 39 if os.path.isdir(os.path.join(self._base_path, directory, filename)): 40 html += '<a>' + filename + '/</a>\n' 41 else: 42 html += '<a>' + filename + '</a>\n' 43 html += '</html>' 44 return html 45 46 def FetchAsync(self, url): 47 self._async_count += 1 48 url = url.rsplit('?', 1)[0] 49 def resolve(): 50 self._async_resolve_count += 1 51 return self._DoFetch(url) 52 return Future(delegate=Gettable(resolve)) 53 54 def Fetch(self, url): 55 self._sync_count += 1 56 return self._DoFetch(url) 57 58 def _DoFetch(self, url): 59 url = url.rsplit('?', 1)[0] 60 result = _Response() 61 if url.endswith('/'): 62 result.content = self._ListDir(url) 63 else: 64 result.content = self._ReadFile(url) 65 return result 66 67 def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0): 68 '''Returns a tuple (success, error). Use in tests like: 69 self.assertTrue(*fetcher.CheckAndReset(...)) 70 ''' 71 errors = [] 72 for desc, expected, actual in ( 73 ('sync_count', sync_count, self._sync_count), 74 ('async_count', async_count, self._async_count), 75 ('async_resolve_count', async_resolve_count, 76 self._async_resolve_count)): 77 if actual != expected: 78 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 79 try: 80 return (len(errors) == 0, ', '.join(errors)) 81 finally: 82 self.Reset() 83 84 def Reset(self): 85 self._sync_count = 0 86 self._async_count = 0 87 self._async_resolve_count = 0 88 89 90class FakeURLFSFetcher(object): 91 '''Use a file_system to resolve fake fetches. Mimics the interface of Google 92 Appengine's urlfetch. 93 ''' 94 95 def __init__(self, file_system, base_path): 96 self._base_path = base_path 97 self._file_system = file_system 98 99 def FetchAsync(self, url, **kwargs): 100 return Future(value=self.Fetch(url)) 101 102 def Fetch(self, url, **kwargs): 103 return _Response(self._file_system.ReadSingle( 104 self._base_path + '/' + url).Get()) 105 106 107class MockURLFetcher(object): 108 def __init__(self, fetcher): 109 self._fetcher = fetcher 110 self.Reset() 111 112 def Fetch(self, url, **kwargs): 113 self._fetch_count += 1 114 return self._fetcher.Fetch(url, **kwargs) 115 116 def FetchAsync(self, url, **kwargs): 117 self._fetch_async_count += 1 118 future = self._fetcher.FetchAsync(url, **kwargs) 119 def resolve(): 120 self._fetch_resolve_count += 1 121 return future.Get() 122 return Future(delegate=Gettable(resolve)) 123 124 def CheckAndReset(self, 125 fetch_count=0, 126 fetch_async_count=0, 127 fetch_resolve_count=0): 128 errors = [] 129 for desc, expected, actual in ( 130 ('fetch_count', fetch_count, self._fetch_count), 131 ('fetch_async_count', fetch_async_count, self._fetch_async_count), 132 ('fetch_resolve_count', fetch_resolve_count, 133 self._fetch_resolve_count)): 134 if actual != expected: 135 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 136 try: 137 return (len(errors) == 0, ', '.join(errors)) 138 finally: 139 self.Reset() 140 141 def Reset(self): 142 self._fetch_count = 0 143 self._fetch_async_count = 0 144 self._fetch_resolve_count = 0 145