1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6
7from future import Gettable, Future
8
9
10class _Response(object):
11  def __init__(self, content=''):
12    self.content = content
13    self.headers = {'Content-Type': 'none'}
14    self.status_code = 200
15
16
17class FakeUrlFetcher(object):
18  def __init__(self, base_path):
19    self._base_path = base_path
20    # Mock capabilities. Perhaps this class should be MockUrlFetcher.
21    self._sync_count = 0
22    self._async_count = 0
23    self._async_resolve_count = 0
24
25  def _ReadFile(self, filename):
26    with open(os.path.join(self._base_path, filename), 'r') as f:
27      return f.read()
28
29  def _ListDir(self, directory):
30    # In some tests, we need to test listing a directory from the HTML returned
31    # from SVN. This reads an HTML file that has the directories HTML.
32    if not os.path.isdir(os.path.join(self._base_path, directory)):
33      return self._ReadFile(directory[:-1])
34    files = os.listdir(os.path.join(self._base_path, directory))
35    html = '<html><title>Revision: 00000</title>\n'
36    for filename in files:
37      if filename.startswith('.'):
38        continue
39      if os.path.isdir(os.path.join(self._base_path, directory, filename)):
40        html += '<a>' + filename + '/</a>\n'
41      else:
42        html += '<a>' + filename + '</a>\n'
43    html += '</html>'
44    return html
45
46  def FetchAsync(self, url):
47    self._async_count += 1
48    url = url.rsplit('?', 1)[0]
49    def resolve():
50      self._async_resolve_count += 1
51      return self._DoFetch(url)
52    return Future(delegate=Gettable(resolve))
53
54  def Fetch(self, url):
55    self._sync_count += 1
56    return self._DoFetch(url)
57
58  def _DoFetch(self, url):
59    url = url.rsplit('?', 1)[0]
60    result = _Response()
61    if url.endswith('/'):
62      result.content = self._ListDir(url)
63    else:
64      result.content = self._ReadFile(url)
65    return result
66
67  def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0):
68    '''Returns a tuple (success, error). Use in tests like:
69    self.assertTrue(*fetcher.CheckAndReset(...))
70    '''
71    errors = []
72    for desc, expected, actual in (
73        ('sync_count', sync_count, self._sync_count),
74        ('async_count', async_count, self._async_count),
75        ('async_resolve_count', async_resolve_count,
76                                self._async_resolve_count)):
77      if actual != expected:
78        errors.append('%s: expected %s got %s' % (desc, expected, actual))
79    try:
80      return (len(errors) == 0, ', '.join(errors))
81    finally:
82      self.Reset()
83
84  def Reset(self):
85    self._sync_count = 0
86    self._async_count = 0
87    self._async_resolve_count = 0
88
89
90class FakeURLFSFetcher(object):
91  '''Use a file_system to resolve fake fetches. Mimics the interface of Google
92  Appengine's urlfetch.
93  '''
94
95  def __init__(self, file_system, base_path):
96    self._base_path = base_path
97    self._file_system = file_system
98
99  def FetchAsync(self, url, **kwargs):
100    return Future(value=self.Fetch(url))
101
102  def Fetch(self, url, **kwargs):
103    return _Response(self._file_system.ReadSingle(
104        self._base_path + '/' + url).Get())
105
106
107class MockURLFetcher(object):
108  def __init__(self, fetcher):
109    self._fetcher = fetcher
110    self.Reset()
111
112  def Fetch(self, url, **kwargs):
113    self._fetch_count += 1
114    return self._fetcher.Fetch(url, **kwargs)
115
116  def FetchAsync(self, url, **kwargs):
117    self._fetch_async_count += 1
118    future = self._fetcher.FetchAsync(url, **kwargs)
119    def resolve():
120      self._fetch_resolve_count += 1
121      return future.Get()
122    return Future(delegate=Gettable(resolve))
123
124  def CheckAndReset(self,
125                    fetch_count=0,
126                    fetch_async_count=0,
127                    fetch_resolve_count=0):
128    errors = []
129    for desc, expected, actual in (
130        ('fetch_count', fetch_count, self._fetch_count),
131        ('fetch_async_count', fetch_async_count, self._fetch_async_count),
132        ('fetch_resolve_count', fetch_resolve_count,
133                                self._fetch_resolve_count)):
134      if actual != expected:
135        errors.append('%s: expected %s got %s' % (desc, expected, actual))
136    try:
137      return (len(errors) == 0, ', '.join(errors))
138    finally:
139      self.Reset()
140
141  def Reset(self):
142    self._fetch_count = 0
143    self._fetch_async_count = 0
144    self._fetch_resolve_count = 0
145