fake_url_fetcher.py revision 4e180b6a0b4720a9b8e9e959a882386f690f08ff
1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from functools import partial
6import os
7
8from future import Gettable, Future
9from local_file_system import LocalFileSystem
10
11
12class _Response(object):
13  def __init__(self, content=''):
14    self.content = content
15    self.headers = { 'content-type': 'none' }
16    self.status_code = 200
17
18
19class FakeUrlFetcher(object):
20  def __init__(self, base_path):
21    self._base_path = base_path
22    # Mock capabilities. Perhaps this class should be MockUrlFetcher.
23    self._sync_count = 0
24    self._async_count = 0
25    self._async_resolve_count = 0
26
27  def _ReadFile(self, filename):
28    with open(os.path.join(self._base_path, filename), 'r') as f:
29      return f.read()
30
31  def _ListDir(self, directory):
32    # In some tests, we need to test listing a directory from the HTML returned
33    # from SVN. This reads an HTML file that has the directories HTML.
34    if not os.path.isdir(os.path.join(self._base_path, directory)):
35      return self._ReadFile(directory[:-1])
36    files = os.listdir(os.path.join(self._base_path, directory))
37    html = '<html><title>Revision: 00000</title>\n'
38    for filename in files:
39      if filename.startswith('.'):
40        continue
41      if os.path.isdir(os.path.join(self._base_path, directory, filename)):
42        html += '<a>' + filename + '/</a>\n'
43      else:
44        html += '<a>' + filename + '</a>\n'
45    html += '</html>'
46    return html
47
48  def FetchAsync(self, url):
49    self._async_count += 1
50    url = url.rsplit('?', 1)[0]
51    def resolve():
52      self._async_resolve_count += 1
53      return self._DoFetch(url)
54    return Future(delegate=Gettable(resolve))
55
56  def Fetch(self, url):
57    self._sync_count += 1
58    return self._DoFetch(url)
59
60  def _DoFetch(self, url):
61    url = url.rsplit('?', 1)[0]
62    result = _Response()
63    if url.endswith('/'):
64      result.content = self._ListDir(url)
65    else:
66      result.content = self._ReadFile(url)
67    return result
68
69  def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0):
70    '''Returns a tuple (success, error). Use in tests like:
71    self.assertTrue(*fetcher.CheckAndReset(...))
72    '''
73    errors = []
74    for desc, expected, actual in (
75        ('sync_count', sync_count, self._sync_count),
76        ('async_count', async_count, self._async_count),
77        ('async_resolve_count', async_resolve_count,
78                                self._async_resolve_count)):
79      if actual != expected:
80        errors.append('%s: expected %s got %s' % (desc, expected, actual))
81    try:
82      return (len(errors) == 0, ', '.join(errors))
83    finally:
84      self.Reset()
85
86  def Reset(self):
87    self._sync_count = 0
88    self._async_count = 0
89    self._async_resolve_count = 0
90
91
92class FakeURLFSFetcher(object):
93  '''Use a file_system to resolve fake fetches. Mimics the interface of Google
94  Appengine's urlfetch.
95  '''
96  @staticmethod
97  def Create(file_system):
98    return partial(FakeURLFSFetcher, file_system)
99
100  @staticmethod
101  def CreateLocal():
102    return partial(FakeURLFSFetcher, LocalFileSystem(''))
103
104  def __init__(self, file_system, base_path):
105    self._base_path = base_path
106    self._file_system = file_system
107
108  def FetchAsync(self, url, **kwargs):
109    return Future(value=self.Fetch(url))
110
111  def Fetch(self, url, **kwargs):
112    return _Response(self._file_system.ReadSingle(
113        self._base_path + '/' + url, binary=True).Get())
114