1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json 6import logging 7import os 8 9from telemetry.core import util 10from telemetry.core.backends import codepen_credentials_backend 11from telemetry.core.backends import facebook_credentials_backend 12from telemetry.core.backends import google_credentials_backend 13from telemetry.page.actions import action_runner 14from telemetry.unittest import options_for_unittests 15 16 17class CredentialsError(Exception): 18 """Error that can be thrown when logging in.""" 19 20 21class BrowserCredentials(object): 22 def __init__(self, backends = None): 23 self._credentials = {} 24 self._credentials_path = None 25 self._extra_credentials = {} 26 27 if backends is None: 28 backends = [ 29 codepen_credentials_backend.CodePenCredentialsBackend(), 30 facebook_credentials_backend.FacebookCredentialsBackend(), 31 google_credentials_backend.GoogleCredentialsBackend()] 32 33 self._backends = {} 34 for backend in backends: 35 self._backends[backend.credentials_type] = backend 36 37 def AddBackend(self, backend): 38 assert backend.credentials_type not in self._backends 39 self._backends[backend.credentials_type] = backend 40 41 def IsLoggedIn(self, credentials_type): 42 if credentials_type not in self._backends: 43 raise CredentialsError( 44 'Unrecognized credentials type: %s', credentials_type) 45 if credentials_type not in self._credentials: 46 return False 47 return self._backends[credentials_type].IsLoggedIn() 48 49 def CanLogin(self, credentials_type): 50 if credentials_type not in self._backends: 51 raise CredentialsError( 52 'Unrecognized credentials type: %s', credentials_type) 53 return credentials_type in self._credentials 54 55 def LoginNeeded(self, tab, credentials_type): 56 if credentials_type not in self._backends: 57 raise CredentialsError( 58 'Unrecognized credentials type: %s', credentials_type) 59 if credentials_type not in self._credentials: 60 return False 61 runner = action_runner.ActionRunner(tab) 62 return self._backends[credentials_type].LoginNeeded( 63 tab, runner, self._credentials[credentials_type]) 64 65 def LoginNoLongerNeeded(self, tab, credentials_type): 66 assert credentials_type in self._backends 67 self._backends[credentials_type].LoginNoLongerNeeded(tab) 68 69 @property 70 def credentials_path(self): # pylint: disable=E0202 71 return self._credentials_path 72 73 @credentials_path.setter 74 def credentials_path(self, credentials_path): # pylint: disable=E0202 75 self._credentials_path = credentials_path 76 self._RebuildCredentials() 77 78 def Add(self, credentials_type, data): 79 if credentials_type not in self._extra_credentials: 80 self._extra_credentials[credentials_type] = {} 81 for k, v in data.items(): 82 assert k not in self._extra_credentials[credentials_type] 83 self._extra_credentials[credentials_type][k] = v 84 self._RebuildCredentials() 85 86 def _ResetLoggedInState(self): 87 """Makes the backends think we're not logged in even though we are. 88 Should only be used in unit tests to simulate --dont-override-profile. 89 """ 90 for backend in self._backends.keys(): 91 self._backends[backend]._ResetLoggedInState() # pylint: disable=W0212 92 93 def _RebuildCredentials(self): 94 credentials = {} 95 if self._credentials_path == None: 96 pass 97 elif os.path.exists(self._credentials_path): 98 with open(self._credentials_path, 'r') as f: 99 credentials = json.loads(f.read()) 100 101 # TODO(nduca): use system keychain, if possible. 102 homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials') 103 homedir_credentials = {} 104 105 if (not options_for_unittests.GetCopy() and 106 os.path.exists(homedir_credentials_path)): 107 logging.info("Found ~/.telemetry-credentials. Its contents will be used " 108 "when no other credentials can be found.") 109 with open(homedir_credentials_path, 'r') as f: 110 homedir_credentials = json.loads(f.read()) 111 112 self._credentials = {} 113 all_keys = set(credentials.keys()).union( 114 homedir_credentials.keys()).union( 115 self._extra_credentials.keys()) 116 117 for k in all_keys: 118 if k in credentials: 119 self._credentials[k] = credentials[k] 120 if k in homedir_credentials: 121 logging.info("Will use ~/.telemetry-credentials for %s logins." % k) 122 self._credentials[k] = homedir_credentials[k] 123 if k in self._extra_credentials: 124 self._credentials[k] = self._extra_credentials[k] 125 126 def WarnIfMissingCredentials(self, page_set): 127 num_pages_missing_login = 0 128 missing_credentials = set() 129 for page in page_set: 130 if (page.credentials 131 and not self.CanLogin(page.credentials)): 132 num_pages_missing_login += 1 133 missing_credentials.add(page.credentials) 134 135 if num_pages_missing_login > 0: 136 files_to_tweak = [] 137 if page_set.credentials_path: 138 files_to_tweak.append( 139 os.path.relpath(os.path.join(os.path.dirname(page_set.file_path), 140 page_set.credentials_path))) 141 files_to_tweak.append('~/.telemetry-credentials') 142 143 example_credentials_file = os.path.join( 144 util.GetTelemetryDir(), 'examples', 'credentials_example.json') 145 146 logging.warning(""" 147 Credentials for %s were not found. %i pages will not be tested. 148 149 To fix this, either follow the instructions to authenticate to gsutil 150 here: 151 http://www.chromium.org/developers/telemetry/upload_to_cloud_storage, 152 153 or add your own credentials to: 154 %s 155 An example credentials file you can copy from is here: 156 %s\n""" % (', '.join(missing_credentials), 157 num_pages_missing_login, 158 ' or '.join(files_to_tweak), 159 example_credentials_file)) 160