1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Provides stubs for os, sys and subprocess for testing
6
7This test allows one to test code that itself uses os, sys, and subprocess.
8"""
9
10import ntpath
11import os
12import posixpath
13import re
14import shlex
15import sys
16
17
18class Override(object):
19  def __init__(self, base_module, module_list):
20    self.cloud_storage = None
21    self.open = None
22    self.os = None
23    self.perf_control = None
24    self.raw_input = None
25    self.subprocess = None
26    self.sys = None
27    self.thermal_throttle = None
28    self.logging = None
29    stubs = {'cloud_storage': CloudStorageModuleStub,
30             'open': OpenFunctionStub,
31             'os': OsModuleStub,
32             'perf_control': PerfControlModuleStub,
33             'raw_input': RawInputFunctionStub,
34             'subprocess': SubprocessModuleStub,
35             'sys': SysModuleStub,
36             'thermal_throttle': ThermalThrottleModuleStub,
37             'logging': LoggingStub,
38    }
39    self.adb_commands = None
40    self.os = None
41    self.subprocess = None
42    self.sys = None
43
44    self._base_module = base_module
45    self._overrides = {}
46
47    for module_name in module_list:
48      self._overrides[module_name] = getattr(base_module, module_name, None)
49      setattr(self, module_name, stubs[module_name]())
50      setattr(base_module, module_name, getattr(self, module_name))
51
52    if self.os and self.sys:
53      self.os.path.sys = self.sys
54
55  def __del__(self):
56    assert not len(self._overrides)
57
58  def Restore(self):
59    for module_name, original_module in self._overrides.iteritems():
60      if original_module is None:
61        # This will happen when we override built-in functions, like open.
62        # If we don't delete the attribute, we will shadow the built-in
63        # function with an attribute set to None.
64        delattr(self._base_module, module_name)
65      else:
66        setattr(self._base_module, module_name, original_module)
67    self._overrides = {}
68
69
70class AdbDevice(object):
71
72  def __init__(self):
73    self.has_root = False
74    self.needs_su = False
75    self.shell_command_handlers = {}
76    self.mock_content = []
77    self.system_properties = {}
78    if self.system_properties.get('ro.product.cpu.abi') == None:
79      self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
80
81  def HasRoot(self):
82    return self.has_root
83
84  def NeedsSU(self):
85    return self.needs_su
86
87  def RunShellCommand(self, args, **kwargs):
88    del kwargs  # unused
89    if isinstance(args, basestring):
90      args = shlex.split(args)
91    handler = self.shell_command_handlers[args[0]]
92    return handler(args)
93
94  def FileExists(self, _):
95    return False
96
97  def ReadFile(self, device_path, as_root=False):
98    del device_path, as_root  # unused
99    return self.mock_content
100
101  def GetProp(self, property_name):
102    return self.system_properties[property_name]
103
104  def SetProp(self, property_name, property_value):
105    self.system_properties[property_name] = property_value
106
107
108class CloudStorageModuleStub(object):
109  PUBLIC_BUCKET = 'chromium-telemetry'
110  PARTNER_BUCKET = 'chrome-partner-telemetry'
111  INTERNAL_BUCKET = 'chrome-telemetry'
112  BUCKET_ALIASES = {
113    'public': PUBLIC_BUCKET,
114    'partner': PARTNER_BUCKET,
115    'internal': INTERNAL_BUCKET,
116  }
117
118  # These are used to test for CloudStorage errors.
119  INTERNAL_PERMISSION = 2
120  PARTNER_PERMISSION = 1
121  PUBLIC_PERMISSION = 0
122  # Not logged in.
123  CREDENTIALS_ERROR_PERMISSION = -1
124
125  class NotFoundError(Exception):
126    pass
127
128  class CloudStorageError(Exception):
129    pass
130
131  class PermissionError(CloudStorageError):
132    pass
133
134  class CredentialsError(CloudStorageError):
135    pass
136
137  def __init__(self):
138    self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
139                                 CloudStorageModuleStub.PARTNER_BUCKET:{},
140                                 CloudStorageModuleStub.PUBLIC_BUCKET:{}}
141    self.remote_paths = self.default_remote_paths
142    self.local_file_hashes = {}
143    self.local_hash_files = {}
144    self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
145    self.downloaded_files = []
146
147  def SetPermissionLevelForTesting(self, permission_level):
148    self.permission_level = permission_level
149
150  def CheckPermissionLevelForBucket(self, bucket):
151    if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
152      return
153    elif (self.permission_level ==
154          CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
155      raise CloudStorageModuleStub.CredentialsError()
156    elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
157      if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
158        raise CloudStorageModuleStub.PermissionError()
159    elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
160      if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
161        raise CloudStorageModuleStub.PermissionError()
162    elif bucket not in self.remote_paths:
163      raise CloudStorageModuleStub.NotFoundError()
164
165  def SetRemotePathsForTesting(self, remote_path_dict=None):
166    if not remote_path_dict:
167      self.remote_paths = self.default_remote_paths
168      return
169    self.remote_paths = remote_path_dict
170
171  def GetRemotePathsForTesting(self):
172    if not self.remote_paths:
173      self.remote_paths = self.default_remote_paths
174    return self.remote_paths
175
176  # Set a dictionary of data files and their "calculated" hashes.
177  def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
178    self.local_file_hashes = calculated_hash_dictionary
179
180  def GetLocalDataFiles(self):
181    return self.local_file_hashes.keys()
182
183  # Set a dictionary of hash files and the hashes they should contain.
184  def SetHashFileContentsForTesting(self, hash_file_dictionary):
185    self.local_hash_files = hash_file_dictionary
186
187  def GetLocalHashFiles(self):
188    return self.local_hash_files.keys()
189
190  def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
191    self.remote_paths[bucket][remote_path] = new_hash
192
193  def List(self, bucket):
194    if not bucket or not bucket in self.remote_paths:
195      bucket_error = ('Incorrect bucket specified, correct buckets:' +
196                      str(self.remote_paths))
197      raise CloudStorageModuleStub.CloudStorageError(bucket_error)
198    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
199    return list(self.remote_paths[bucket].keys())
200
201  def Exists(self, bucket, remote_path):
202    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
203    return remote_path in self.remote_paths[bucket]
204
205  def Insert(self, bucket, remote_path, local_path):
206    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
207    if not local_path in self.GetLocalDataFiles():
208      file_path_error = 'Local file path does not exist'
209      raise CloudStorageModuleStub.CloudStorageError(file_path_error)
210    self.remote_paths[bucket][remote_path] = (
211      CloudStorageModuleStub.CalculateHash(self, local_path))
212    return remote_path
213
214  def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
215    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
216    if not remote_path in self.remote_paths[bucket]:
217      if only_if_changed:
218        return False
219      raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
220    remote_hash = self.remote_paths[bucket][remote_path]
221    local_hash = self.local_file_hashes[local_path]
222    if only_if_changed and remote_hash == local_hash:
223      return False
224    self.downloaded_files.append(remote_path)
225    self.local_file_hashes[local_path] = remote_hash
226    self.local_hash_files[local_path + '.sha1'] = remote_hash
227    return remote_hash
228
229  def Get(self, bucket, remote_path, local_path):
230    return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
231                                            local_path, False)
232
233  def GetIfChanged(self, local_path, bucket=None):
234    remote_path = os.path.basename(local_path)
235    if bucket:
236      return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
237                                              local_path, True)
238    result = CloudStorageModuleStub.GetHelper(
239        self, self.PUBLIC_BUCKET, remote_path, local_path, True)
240    if not result:
241      result = CloudStorageModuleStub.GetHelper(
242          self, self.PARTNER_BUCKET, remote_path, local_path, True)
243    if not result:
244      result = CloudStorageModuleStub.GetHelper(
245          self, self.INTERNAL_BUCKET, remote_path, local_path, True)
246    return result
247
248  def GetFilesInDirectoryIfChanged(self, directory, bucket):
249    if os.path.dirname(directory) == directory: # If in the root dir.
250      raise ValueError('Trying to serve root directory from HTTP server.')
251    for dirpath, _, filenames in os.walk(directory):
252      for filename in filenames:
253        path, extension = os.path.splitext(
254            os.path.join(dirpath, filename))
255        if extension != '.sha1':
256          continue
257        self.GetIfChanged(path, bucket)
258
259  def CalculateHash(self, file_path):
260    return self.local_file_hashes[file_path]
261
262  def ReadHash(self, hash_path):
263    return self.local_hash_files[hash_path]
264
265
266class LoggingStub(object):
267  def __init__(self):
268    self.warnings = []
269    self.errors = []
270
271  def info(self, msg, *args):
272    pass
273
274  def error(self, msg, *args):
275    self.errors.append(msg % args)
276
277  def warning(self, msg, *args):
278    self.warnings.append(msg % args)
279
280  def warn(self, msg, *args):
281    self.warning(msg, *args)
282
283
284class OpenFunctionStub(object):
285  class FileStub(object):
286    def __init__(self, data):
287      self._data = data
288
289    def __enter__(self):
290      return self
291
292    def __exit__(self, *args):
293      pass
294
295    def read(self, size=None):
296      if size:
297        return self._data[:size]
298      else:
299        return self._data
300
301    def write(self, data):
302      self._data.write(data)
303
304    def close(self):
305      pass
306
307  def __init__(self):
308    self.files = {}
309
310  def __call__(self, name, *args, **kwargs):
311    return OpenFunctionStub.FileStub(self.files[name])
312
313
314class OsModuleStub(object):
315  class OsEnvironModuleStub(object):
316    def get(self, _):
317      return None
318
319  class OsPathModuleStub(object):
320    def __init__(self, sys_module):
321      self.sys = sys_module
322      self.files = []
323      self.dirs = []
324
325    def exists(self, path):
326      return path in self.files
327
328    def isfile(self, path):
329      return path in self.files
330
331    def isdir(self, path):
332      return path in self.dirs
333
334    def join(self, *paths):
335      def IsAbsolutePath(path):
336        if self.sys.platform.startswith('win'):
337          return re.match('[a-zA-Z]:\\\\', path)
338        else:
339          return path.startswith('/')
340
341      # Per Python specification, if any component is an absolute path,
342      # discard previous components.
343      for index, path in reversed(list(enumerate(paths))):
344        if IsAbsolutePath(path):
345          paths = paths[index:]
346          break
347
348      if self.sys.platform.startswith('win'):
349        tmp = os.path.join(*paths)
350        return tmp.replace('/', '\\')
351      else:
352        tmp = os.path.join(*paths)
353        return tmp.replace('\\', '/')
354
355    def basename(self, path):
356      if self.sys.platform.startswith('win'):
357        return ntpath.basename(path)
358      else:
359        return posixpath.basename(path)
360
361    @staticmethod
362    def abspath(path):
363      return os.path.abspath(path)
364
365    @staticmethod
366    def expanduser(path):
367      return os.path.expanduser(path)
368
369    @staticmethod
370    def dirname(path):
371      return os.path.dirname(path)
372
373    @staticmethod
374    def realpath(path):
375      return os.path.realpath(path)
376
377    @staticmethod
378    def split(path):
379      return os.path.split(path)
380
381    @staticmethod
382    def splitext(path):
383      return os.path.splitext(path)
384
385    @staticmethod
386    def splitdrive(path):
387      return os.path.splitdrive(path)
388
389  X_OK = os.X_OK
390
391  sep = os.sep
392  pathsep = os.pathsep
393
394  def __init__(self, sys_module=sys):
395    self.path = OsModuleStub.OsPathModuleStub(sys_module)
396    self.environ = OsModuleStub.OsEnvironModuleStub()
397    self.display = ':0'
398    self.local_app_data = None
399    self.sys_path = None
400    self.program_files = None
401    self.program_files_x86 = None
402    self.devnull = os.devnull
403    self._directory = {}
404
405  def access(self, path, _):
406    return path in self.path.files
407
408  def getenv(self, name, value=None):
409    if name == 'DISPLAY':
410      env = self.display
411    elif name == 'LOCALAPPDATA':
412      env = self.local_app_data
413    elif name == 'PATH':
414      env = self.sys_path
415    elif name == 'PROGRAMFILES':
416      env = self.program_files
417    elif name == 'PROGRAMFILES(X86)':
418      env = self.program_files_x86
419    else:
420      raise NotImplementedError('Unsupported getenv')
421    return env if env else value
422
423  def chdir(self, path):
424    pass
425
426  def walk(self, top):
427    for dir_name in self._directory:
428      yield top, dir_name, self._directory[dir_name]
429
430
431class PerfControlModuleStub(object):
432  class PerfControlStub(object):
433    def __init__(self, adb):
434      pass
435
436  def __init__(self):
437    self.PerfControl = PerfControlModuleStub.PerfControlStub
438
439
440class RawInputFunctionStub(object):
441  def __init__(self):
442    self.input = ''
443
444  def __call__(self, name, *args, **kwargs):
445    return self.input
446
447
448class SubprocessModuleStub(object):
449  class PopenStub(object):
450    def __init__(self):
451      self.communicate_result = ('', '')
452      self.returncode_result = 0
453
454    def __call__(self, args, **kwargs):
455      return self
456
457    def communicate(self):
458      return self.communicate_result
459
460    @property
461    def returncode(self):
462      return self.returncode_result
463
464  def __init__(self):
465    self.Popen = SubprocessModuleStub.PopenStub()
466    self.PIPE = None
467
468  def call(self, *args, **kwargs):
469    pass
470
471
472class SysModuleStub(object):
473  def __init__(self):
474    self.platform = ''
475
476
477class ThermalThrottleModuleStub(object):
478  class ThermalThrottleStub(object):
479    def __init__(self, adb):
480      pass
481
482  def __init__(self):
483    self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
484