fake_url_fetcher.py revision f2477e01787aa58f445919b809d89e252beef54f
1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6
7from future import Gettable, Future
8from local_file_system import LocalFileSystem
9
10
11class _Response(object):
12  def __init__(self, content=''):
13    self.content = content
14    self.headers = {'Content-Type': 'none'}
15    self.status_code = 200
16
17
18class FakeUrlFetcher(object):
19  def __init__(self, base_path):
20    self._base_path = base_path
21    # Mock capabilities. Perhaps this class should be MockUrlFetcher.
22    self._sync_count = 0
23    self._async_count = 0
24    self._async_resolve_count = 0
25
26  def _ReadFile(self, filename):
27    with open(os.path.join(self._base_path, filename), 'r') as f:
28      return f.read()
29
30  def _ListDir(self, directory):
31    # In some tests, we need to test listing a directory from the HTML returned
32    # from SVN. This reads an HTML file that has the directories HTML.
33    if not os.path.isdir(os.path.join(self._base_path, directory)):
34      return self._ReadFile(directory[:-1])
35    files = os.listdir(os.path.join(self._base_path, directory))
36    html = '<html><title>Revision: 00000</title>\n'
37    for filename in files:
38      if filename.startswith('.'):
39        continue
40      if os.path.isdir(os.path.join(self._base_path, directory, filename)):
41        html += '<a>' + filename + '/</a>\n'
42      else:
43        html += '<a>' + filename + '</a>\n'
44    html += '</html>'
45    return html
46
47  def FetchAsync(self, url):
48    self._async_count += 1
49    url = url.rsplit('?', 1)[0]
50    def resolve():
51      self._async_resolve_count += 1
52      return self._DoFetch(url)
53    return Future(delegate=Gettable(resolve))
54
55  def Fetch(self, url):
56    self._sync_count += 1
57    return self._DoFetch(url)
58
59  def _DoFetch(self, url):
60    url = url.rsplit('?', 1)[0]
61    result = _Response()
62    if url.endswith('/'):
63      result.content = self._ListDir(url)
64    else:
65      result.content = self._ReadFile(url)
66    return result
67
68  def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0):
69    '''Returns a tuple (success, error). Use in tests like:
70    self.assertTrue(*fetcher.CheckAndReset(...))
71    '''
72    errors = []
73    for desc, expected, actual in (
74        ('sync_count', sync_count, self._sync_count),
75        ('async_count', async_count, self._async_count),
76        ('async_resolve_count', async_resolve_count,
77                                self._async_resolve_count)):
78      if actual != expected:
79        errors.append('%s: expected %s got %s' % (desc, expected, actual))
80    try:
81      return (len(errors) == 0, ', '.join(errors))
82    finally:
83      self.Reset()
84
85  def Reset(self):
86    self._sync_count = 0
87    self._async_count = 0
88    self._async_resolve_count = 0
89
90
91class FakeURLFSFetcher(object):
92  '''Use a file_system to resolve fake fetches. Mimics the interface of Google
93  Appengine's urlfetch.
94  '''
95
96  def __init__(self, file_system, base_path):
97    self._base_path = base_path
98    self._file_system = file_system
99
100  def FetchAsync(self, url, **kwargs):
101    return Future(value=self.Fetch(url))
102
103  def Fetch(self, url, **kwargs):
104    return _Response(self._file_system.ReadSingle(
105        self._base_path + '/' + url, binary=True).Get())
106
107
108class MockURLFetcher(object):
109  def __init__(self, fetcher):
110    self._fetcher = fetcher
111    self.Reset()
112
113  def Fetch(self, url, **kwargs):
114    self._fetch_count += 1
115    return self._fetcher.Fetch(url, **kwargs)
116
117  def FetchAsync(self, url, **kwargs):
118    self._fetch_async_count += 1
119    future = self._fetcher.FetchAsync(url, **kwargs)
120    def resolve():
121      self._fetch_resolve_count += 1
122      return future.Get()
123    return Future(delegate=Gettable(resolve))
124
125  def CheckAndReset(self,
126                    fetch_count=0,
127                    fetch_async_count=0,
128                    fetch_resolve_count=0):
129    errors = []
130    for desc, expected, actual in (
131        ('fetch_count', fetch_count, self._fetch_count),
132        ('fetch_async_count', fetch_async_count, self._fetch_async_count),
133        ('fetch_resolve_count', fetch_resolve_count,
134                                self._fetch_resolve_count)):
135      if actual != expected:
136        errors.append('%s: expected %s got %s' % (desc, expected, actual))
137    try:
138      return (len(errors) == 0, ', '.join(errors))
139    finally:
140      self.Reset()
141
142  def Reset(self):
143    self._fetch_count = 0
144    self._fetch_async_count = 0
145    self._fetch_resolve_count = 0
146