1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Provides stubs for os, sys and subprocess for testing 6 7This test allows one to test code that itself uses os, sys, and subprocess. 8""" 9 10import ntpath 11import os 12import posixpath 13import re 14import shlex 15import sys 16 17 18class Override(object): 19 def __init__(self, base_module, module_list): 20 self.cloud_storage = None 21 self.open = None 22 self.os = None 23 self.perf_control = None 24 self.raw_input = None 25 self.subprocess = None 26 self.sys = None 27 self.thermal_throttle = None 28 self.logging = None 29 stubs = {'cloud_storage': CloudStorageModuleStub, 30 'open': OpenFunctionStub, 31 'os': OsModuleStub, 32 'perf_control': PerfControlModuleStub, 33 'raw_input': RawInputFunctionStub, 34 'subprocess': SubprocessModuleStub, 35 'sys': SysModuleStub, 36 'thermal_throttle': ThermalThrottleModuleStub, 37 'logging': LoggingStub, 38 } 39 self.adb_commands = None 40 self.os = None 41 self.subprocess = None 42 self.sys = None 43 44 self._base_module = base_module 45 self._overrides = {} 46 47 for module_name in module_list: 48 self._overrides[module_name] = getattr(base_module, module_name, None) 49 setattr(self, module_name, stubs[module_name]()) 50 setattr(base_module, module_name, getattr(self, module_name)) 51 52 if self.os and self.sys: 53 self.os.path.sys = self.sys 54 55 def __del__(self): 56 assert not len(self._overrides) 57 58 def Restore(self): 59 for module_name, original_module in self._overrides.iteritems(): 60 if original_module is None: 61 # This will happen when we override built-in functions, like open. 62 # If we don't delete the attribute, we will shadow the built-in 63 # function with an attribute set to None. 64 delattr(self._base_module, module_name) 65 else: 66 setattr(self._base_module, module_name, original_module) 67 self._overrides = {} 68 69 70class AdbDevice(object): 71 72 def __init__(self): 73 self.has_root = False 74 self.needs_su = False 75 self.shell_command_handlers = {} 76 self.mock_content = [] 77 self.system_properties = {} 78 if self.system_properties.get('ro.product.cpu.abi') == None: 79 self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a' 80 81 def HasRoot(self): 82 return self.has_root 83 84 def NeedsSU(self): 85 return self.needs_su 86 87 def RunShellCommand(self, args, **kwargs): 88 del kwargs # unused 89 if isinstance(args, basestring): 90 args = shlex.split(args) 91 handler = self.shell_command_handlers[args[0]] 92 return handler(args) 93 94 def FileExists(self, _): 95 return False 96 97 def ReadFile(self, device_path, as_root=False): 98 del device_path, as_root # unused 99 return self.mock_content 100 101 def GetProp(self, property_name): 102 return self.system_properties[property_name] 103 104 def SetProp(self, property_name, property_value): 105 self.system_properties[property_name] = property_value 106 107 108class CloudStorageModuleStub(object): 109 PUBLIC_BUCKET = 'chromium-telemetry' 110 PARTNER_BUCKET = 'chrome-partner-telemetry' 111 INTERNAL_BUCKET = 'chrome-telemetry' 112 BUCKET_ALIASES = { 113 'public': PUBLIC_BUCKET, 114 'partner': PARTNER_BUCKET, 115 'internal': INTERNAL_BUCKET, 116 } 117 118 # These are used to test for CloudStorage errors. 119 INTERNAL_PERMISSION = 2 120 PARTNER_PERMISSION = 1 121 PUBLIC_PERMISSION = 0 122 # Not logged in. 123 CREDENTIALS_ERROR_PERMISSION = -1 124 125 class NotFoundError(Exception): 126 pass 127 128 class CloudStorageError(Exception): 129 pass 130 131 class PermissionError(CloudStorageError): 132 pass 133 134 class CredentialsError(CloudStorageError): 135 pass 136 137 def __init__(self): 138 self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{}, 139 CloudStorageModuleStub.PARTNER_BUCKET:{}, 140 CloudStorageModuleStub.PUBLIC_BUCKET:{}} 141 self.remote_paths = self.default_remote_paths 142 self.local_file_hashes = {} 143 self.local_hash_files = {} 144 self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION 145 self.downloaded_files = [] 146 147 def SetPermissionLevelForTesting(self, permission_level): 148 self.permission_level = permission_level 149 150 def CheckPermissionLevelForBucket(self, bucket): 151 if bucket == CloudStorageModuleStub.PUBLIC_BUCKET: 152 return 153 elif (self.permission_level == 154 CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION): 155 raise CloudStorageModuleStub.CredentialsError() 156 elif bucket == CloudStorageModuleStub.PARTNER_BUCKET: 157 if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION: 158 raise CloudStorageModuleStub.PermissionError() 159 elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET: 160 if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION: 161 raise CloudStorageModuleStub.PermissionError() 162 elif bucket not in self.remote_paths: 163 raise CloudStorageModuleStub.NotFoundError() 164 165 def SetRemotePathsForTesting(self, remote_path_dict=None): 166 if not remote_path_dict: 167 self.remote_paths = self.default_remote_paths 168 return 169 self.remote_paths = remote_path_dict 170 171 def GetRemotePathsForTesting(self): 172 if not self.remote_paths: 173 self.remote_paths = self.default_remote_paths 174 return self.remote_paths 175 176 # Set a dictionary of data files and their "calculated" hashes. 177 def SetCalculatedHashesForTesting(self, calculated_hash_dictionary): 178 self.local_file_hashes = calculated_hash_dictionary 179 180 def GetLocalDataFiles(self): 181 return self.local_file_hashes.keys() 182 183 # Set a dictionary of hash files and the hashes they should contain. 184 def SetHashFileContentsForTesting(self, hash_file_dictionary): 185 self.local_hash_files = hash_file_dictionary 186 187 def GetLocalHashFiles(self): 188 return self.local_hash_files.keys() 189 190 def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash): 191 self.remote_paths[bucket][remote_path] = new_hash 192 193 def List(self, bucket): 194 if not bucket or not bucket in self.remote_paths: 195 bucket_error = ('Incorrect bucket specified, correct buckets:' + 196 str(self.remote_paths)) 197 raise CloudStorageModuleStub.CloudStorageError(bucket_error) 198 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 199 return list(self.remote_paths[bucket].keys()) 200 201 def Exists(self, bucket, remote_path): 202 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 203 return remote_path in self.remote_paths[bucket] 204 205 def Insert(self, bucket, remote_path, local_path): 206 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 207 if not local_path in self.GetLocalDataFiles(): 208 file_path_error = 'Local file path does not exist' 209 raise CloudStorageModuleStub.CloudStorageError(file_path_error) 210 self.remote_paths[bucket][remote_path] = ( 211 CloudStorageModuleStub.CalculateHash(self, local_path)) 212 return remote_path 213 214 def GetHelper(self, bucket, remote_path, local_path, only_if_changed): 215 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 216 if not remote_path in self.remote_paths[bucket]: 217 if only_if_changed: 218 return False 219 raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.') 220 remote_hash = self.remote_paths[bucket][remote_path] 221 local_hash = self.local_file_hashes[local_path] 222 if only_if_changed and remote_hash == local_hash: 223 return False 224 self.downloaded_files.append(remote_path) 225 self.local_file_hashes[local_path] = remote_hash 226 self.local_hash_files[local_path + '.sha1'] = remote_hash 227 return remote_hash 228 229 def Get(self, bucket, remote_path, local_path): 230 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 231 local_path, False) 232 233 def GetIfChanged(self, local_path, bucket=None): 234 remote_path = os.path.basename(local_path) 235 if bucket: 236 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 237 local_path, True) 238 result = CloudStorageModuleStub.GetHelper( 239 self, self.PUBLIC_BUCKET, remote_path, local_path, True) 240 if not result: 241 result = CloudStorageModuleStub.GetHelper( 242 self, self.PARTNER_BUCKET, remote_path, local_path, True) 243 if not result: 244 result = CloudStorageModuleStub.GetHelper( 245 self, self.INTERNAL_BUCKET, remote_path, local_path, True) 246 return result 247 248 def GetFilesInDirectoryIfChanged(self, directory, bucket): 249 if os.path.dirname(directory) == directory: # If in the root dir. 250 raise ValueError('Trying to serve root directory from HTTP server.') 251 for dirpath, _, filenames in os.walk(directory): 252 for filename in filenames: 253 path, extension = os.path.splitext( 254 os.path.join(dirpath, filename)) 255 if extension != '.sha1': 256 continue 257 self.GetIfChanged(path, bucket) 258 259 def CalculateHash(self, file_path): 260 return self.local_file_hashes[file_path] 261 262 def ReadHash(self, hash_path): 263 return self.local_hash_files[hash_path] 264 265 266class LoggingStub(object): 267 def __init__(self): 268 self.warnings = [] 269 self.errors = [] 270 271 def info(self, msg, *args): 272 pass 273 274 def error(self, msg, *args): 275 self.errors.append(msg % args) 276 277 def warning(self, msg, *args): 278 self.warnings.append(msg % args) 279 280 def warn(self, msg, *args): 281 self.warning(msg, *args) 282 283 284class OpenFunctionStub(object): 285 class FileStub(object): 286 def __init__(self, data): 287 self._data = data 288 289 def __enter__(self): 290 return self 291 292 def __exit__(self, *args): 293 pass 294 295 def read(self, size=None): 296 if size: 297 return self._data[:size] 298 else: 299 return self._data 300 301 def write(self, data): 302 self._data.write(data) 303 304 def close(self): 305 pass 306 307 def __init__(self): 308 self.files = {} 309 310 def __call__(self, name, *args, **kwargs): 311 return OpenFunctionStub.FileStub(self.files[name]) 312 313 314class OsModuleStub(object): 315 class OsEnvironModuleStub(object): 316 def get(self, _): 317 return None 318 319 class OsPathModuleStub(object): 320 def __init__(self, sys_module): 321 self.sys = sys_module 322 self.files = [] 323 self.dirs = [] 324 325 def exists(self, path): 326 return path in self.files 327 328 def isfile(self, path): 329 return path in self.files 330 331 def isdir(self, path): 332 return path in self.dirs 333 334 def join(self, *paths): 335 def IsAbsolutePath(path): 336 if self.sys.platform.startswith('win'): 337 return re.match('[a-zA-Z]:\\\\', path) 338 else: 339 return path.startswith('/') 340 341 # Per Python specification, if any component is an absolute path, 342 # discard previous components. 343 for index, path in reversed(list(enumerate(paths))): 344 if IsAbsolutePath(path): 345 paths = paths[index:] 346 break 347 348 if self.sys.platform.startswith('win'): 349 tmp = os.path.join(*paths) 350 return tmp.replace('/', '\\') 351 else: 352 tmp = os.path.join(*paths) 353 return tmp.replace('\\', '/') 354 355 def basename(self, path): 356 if self.sys.platform.startswith('win'): 357 return ntpath.basename(path) 358 else: 359 return posixpath.basename(path) 360 361 @staticmethod 362 def abspath(path): 363 return os.path.abspath(path) 364 365 @staticmethod 366 def expanduser(path): 367 return os.path.expanduser(path) 368 369 @staticmethod 370 def dirname(path): 371 return os.path.dirname(path) 372 373 @staticmethod 374 def realpath(path): 375 return os.path.realpath(path) 376 377 @staticmethod 378 def split(path): 379 return os.path.split(path) 380 381 @staticmethod 382 def splitext(path): 383 return os.path.splitext(path) 384 385 @staticmethod 386 def splitdrive(path): 387 return os.path.splitdrive(path) 388 389 X_OK = os.X_OK 390 391 sep = os.sep 392 pathsep = os.pathsep 393 394 def __init__(self, sys_module=sys): 395 self.path = OsModuleStub.OsPathModuleStub(sys_module) 396 self.environ = OsModuleStub.OsEnvironModuleStub() 397 self.display = ':0' 398 self.local_app_data = None 399 self.sys_path = None 400 self.program_files = None 401 self.program_files_x86 = None 402 self.devnull = os.devnull 403 self._directory = {} 404 405 def access(self, path, _): 406 return path in self.path.files 407 408 def getenv(self, name, value=None): 409 if name == 'DISPLAY': 410 env = self.display 411 elif name == 'LOCALAPPDATA': 412 env = self.local_app_data 413 elif name == 'PATH': 414 env = self.sys_path 415 elif name == 'PROGRAMFILES': 416 env = self.program_files 417 elif name == 'PROGRAMFILES(X86)': 418 env = self.program_files_x86 419 else: 420 raise NotImplementedError('Unsupported getenv') 421 return env if env else value 422 423 def chdir(self, path): 424 pass 425 426 def walk(self, top): 427 for dir_name in self._directory: 428 yield top, dir_name, self._directory[dir_name] 429 430 431class PerfControlModuleStub(object): 432 class PerfControlStub(object): 433 def __init__(self, adb): 434 pass 435 436 def __init__(self): 437 self.PerfControl = PerfControlModuleStub.PerfControlStub 438 439 440class RawInputFunctionStub(object): 441 def __init__(self): 442 self.input = '' 443 444 def __call__(self, name, *args, **kwargs): 445 return self.input 446 447 448class SubprocessModuleStub(object): 449 class PopenStub(object): 450 def __init__(self): 451 self.communicate_result = ('', '') 452 self.returncode_result = 0 453 454 def __call__(self, args, **kwargs): 455 return self 456 457 def communicate(self): 458 return self.communicate_result 459 460 @property 461 def returncode(self): 462 return self.returncode_result 463 464 def __init__(self): 465 self.Popen = SubprocessModuleStub.PopenStub() 466 self.PIPE = None 467 468 def call(self, *args, **kwargs): 469 pass 470 471 472class SysModuleStub(object): 473 def __init__(self): 474 self.platform = '' 475 476 477class ThermalThrottleModuleStub(object): 478 class ThermalThrottleStub(object): 479 def __init__(self, adb): 480 pass 481 482 def __init__(self): 483 self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub 484