fake_url_fetcher.py revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from functools import partial 6import os 7 8from future import Gettable, Future 9from local_file_system import LocalFileSystem 10 11 12class _Response(object): 13 def __init__(self, content=''): 14 self.content = content 15 self.headers = { 'content-type': 'none' } 16 self.status_code = 200 17 18 19class FakeUrlFetcher(object): 20 def __init__(self, base_path): 21 self._base_path = base_path 22 # Mock capabilities. Perhaps this class should be MockUrlFetcher. 23 self._sync_count = 0 24 self._async_count = 0 25 self._async_resolve_count = 0 26 27 def _ReadFile(self, filename): 28 with open(os.path.join(self._base_path, filename), 'r') as f: 29 return f.read() 30 31 def _ListDir(self, directory): 32 # In some tests, we need to test listing a directory from the HTML returned 33 # from SVN. This reads an HTML file that has the directories HTML. 34 if not os.path.isdir(os.path.join(self._base_path, directory)): 35 return self._ReadFile(directory[:-1]) 36 files = os.listdir(os.path.join(self._base_path, directory)) 37 html = '<html><title>Revision: 00000</title>\n' 38 for filename in files: 39 if filename.startswith('.'): 40 continue 41 if os.path.isdir(os.path.join(self._base_path, directory, filename)): 42 html += '<a>' + filename + '/</a>\n' 43 else: 44 html += '<a>' + filename + '</a>\n' 45 html += '</html>' 46 return html 47 48 def FetchAsync(self, url): 49 self._async_count += 1 50 url = url.rsplit('?', 1)[0] 51 def resolve(): 52 self._async_resolve_count += 1 53 return self._DoFetch(url) 54 return Future(delegate=Gettable(resolve)) 55 56 def Fetch(self, url): 57 self._sync_count += 1 58 return self._DoFetch(url) 59 60 def _DoFetch(self, url): 61 url = url.rsplit('?', 1)[0] 62 result = _Response() 63 if url.endswith('/'): 64 result.content = self._ListDir(url) 65 else: 66 result.content = self._ReadFile(url) 67 return result 68 69 def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0): 70 '''Returns a tuple (success, error). Use in tests like: 71 self.assertTrue(*fetcher.CheckAndReset(...)) 72 ''' 73 errors = [] 74 for desc, expected, actual in ( 75 ('sync_count', sync_count, self._sync_count), 76 ('async_count', async_count, self._async_count), 77 ('async_resolve_count', async_resolve_count, 78 self._async_resolve_count)): 79 if actual != expected: 80 errors.append('%s: expected %s got %s' % (desc, expected, actual)) 81 try: 82 return (len(errors) == 0, ', '.join(errors)) 83 finally: 84 self.Reset() 85 86 def Reset(self): 87 self._sync_count = 0 88 self._async_count = 0 89 self._async_resolve_count = 0 90 91 92class FakeURLFSFetcher(object): 93 '''Use a file_system to resolve fake fetches. Mimics the interface of Google 94 Appengine's urlfetch. 95 ''' 96 @staticmethod 97 def Create(file_system): 98 return partial(FakeURLFSFetcher, file_system) 99 100 @staticmethod 101 def CreateLocal(): 102 return partial(FakeURLFSFetcher, LocalFileSystem('')) 103 104 def __init__(self, file_system, base_path): 105 self._base_path = base_path 106 self._file_system = file_system 107 108 def FetchAsync(self, url, **kwargs): 109 return Future(value=self.Fetch(url)) 110 111 def Fetch(self, url, **kwargs): 112 return _Response(self._file_system.ReadSingle( 113 self._base_path + '/' + url, binary=True).Get()) 114