1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6import posixpath
7
8from future import Future
9from path_util import AssertIsDirectory, IsDirectory
10
11
12class _Response(object):
13  def __init__(self, content=''):
14    self.content = content
15    self.headers = {'Content-Type': 'none'}
16    self.status_code = 200
17
18
19class FakeUrlFetcher(object):
20  def __init__(self, base_path):
21    self._base_path = base_path
22    # Mock capabilities. Perhaps this class should be MockUrlFetcher.
23    self._sync_count = 0
24    self._async_count = 0
25    self._async_resolve_count = 0
26
27  def _ReadFile(self, filename):
28    # Fake DownloadError, the error that appengine usually raises.
29    class DownloadError(Exception): pass
30    try:
31      with open(os.path.join(self._base_path, filename), 'r') as f:
32        return f.read()
33    except IOError as e:
34      raise DownloadError(e)
35
36  def _ListDir(self, directory):
37    # In some tests, we need to test listing a directory from the HTML returned
38    # from SVN. This reads an HTML file that has the directories HTML.
39    if not os.path.isdir(os.path.join(self._base_path, directory)):
40      return self._ReadFile(directory[:-1])
41    files = os.listdir(os.path.join(self._base_path, directory))
42    html = '<html><title>Revision: 00000</title>\n'
43    for filename in files:
44      if filename.startswith('.'):
45        continue
46      if os.path.isdir(os.path.join(self._base_path, directory, filename)):
47        html += '<a>' + filename + '/</a>\n'
48      else:
49        html += '<a>' + filename + '</a>\n'
50    html += '</html>'
51    return html
52
53  def FetchAsync(self, url):
54    self._async_count += 1
55    url = url.rsplit('?', 1)[0]
56    def resolve():
57      self._async_resolve_count += 1
58      return self._DoFetch(url)
59    return Future(callback=resolve)
60
61  def Fetch(self, url):
62    self._sync_count += 1
63    return self._DoFetch(url)
64
65  def _DoFetch(self, url):
66    url = url.rsplit('?', 1)[0]
67    result = _Response()
68    if IsDirectory(url):
69      result.content = self._ListDir(url)
70    else:
71      result.content = self._ReadFile(url)
72    return result
73
74  def CheckAndReset(self, sync_count=0, async_count=0, async_resolve_count=0):
75    '''Returns a tuple (success, error). Use in tests like:
76    self.assertTrue(*fetcher.CheckAndReset(...))
77    '''
78    errors = []
79    for desc, expected, actual in (
80        ('sync_count', sync_count, self._sync_count),
81        ('async_count', async_count, self._async_count),
82        ('async_resolve_count', async_resolve_count,
83                                self._async_resolve_count)):
84      if actual != expected:
85        errors.append('%s: expected %s got %s' % (desc, expected, actual))
86    try:
87      return (len(errors) == 0, ', '.join(errors))
88    finally:
89      self.Reset()
90
91  def Reset(self):
92    self._sync_count = 0
93    self._async_count = 0
94    self._async_resolve_count = 0
95
96
97class FakeURLFSFetcher(object):
98  '''Use a file_system to resolve fake fetches. Mimics the interface of Google
99  Appengine's urlfetch.
100  '''
101
102  def __init__(self, file_system, base_path):
103    AssertIsDirectory(base_path)
104    self._base_path = base_path
105    self._file_system = file_system
106
107  def FetchAsync(self, url, **kwargs):
108    return Future(value=self.Fetch(url))
109
110  def Fetch(self, url, **kwargs):
111    return _Response(self._file_system.ReadSingle(
112        posixpath.join(self._base_path, url)).Get())
113
114  def UpdateFS(self, file_system, base_path=None):
115    '''Replace the underlying FileSystem used to reslove URLs.
116    '''
117    self._file_system = file_system
118    self._base_path = base_path or self._base_path
119
120
121class MockURLFetcher(object):
122  def __init__(self, fetcher):
123    self._fetcher = fetcher
124    self.Reset()
125
126  def Fetch(self, url, **kwargs):
127    self._fetch_count += 1
128    return self._fetcher.Fetch(url, **kwargs)
129
130  def FetchAsync(self, url, **kwargs):
131    self._fetch_async_count += 1
132    def next(result):
133      self._fetch_resolve_count += 1
134      return result
135    return self._fetcher.FetchAsync(url, **kwargs).Then(next)
136
137  def CheckAndReset(self,
138                    fetch_count=0,
139                    fetch_async_count=0,
140                    fetch_resolve_count=0):
141    errors = []
142    for desc, expected, actual in (
143        ('fetch_count', fetch_count, self._fetch_count),
144        ('fetch_async_count', fetch_async_count, self._fetch_async_count),
145        ('fetch_resolve_count', fetch_resolve_count,
146                                self._fetch_resolve_count)):
147      if actual != expected:
148        errors.append('%s: expected %s got %s' % (desc, expected, actual))
149    try:
150      return (len(errors) == 0, ', '.join(errors))
151    finally:
152      self.Reset()
153
154  def Reset(self):
155    self._fetch_count = 0
156    self._fetch_async_count = 0
157    self._fetch_resolve_count = 0
158