1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Routines to generate root and server certificates. 16 17Certificate Naming Conventions: 18 ca_cert: crypto.X509 for the certificate authority (w/ both the pub & 19 priv keys) 20 cert: a crypto.X509 certificate (w/ just the pub key) 21 cert_str: a certificate string (w/ just the pub cert) 22 key: a private crypto.PKey (from ca or pem) 23 ca_cert_str: a certificae authority string (w/ both the pub & priv certs) 24""" 25 26import logging 27import os 28import platform 29import socket 30import subprocess 31import time 32 33openssl_import_error = None 34 35Error = None 36SSL_METHOD = None 37SysCallError = None 38VERIFY_PEER = None 39ZeroReturnError = None 40FILETYPE_PEM = None 41 42try: 43 from OpenSSL import crypto, SSL 44 45 Error = SSL.Error 46 SSL_METHOD = SSL.SSLv23_METHOD 47 SysCallError = SSL.SysCallError 48 VERIFY_PEER = SSL.VERIFY_PEER 49 ZeroReturnError = SSL.ZeroReturnError 50 FILETYPE_PEM = crypto.FILETYPE_PEM 51except ImportError, e: 52 openssl_import_error = e 53 54 55def get_ssl_context(method=SSL_METHOD): 56 # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD 57 if openssl_import_error: 58 raise openssl_import_error # pylint: disable=raising-bad-type 59 return SSL.Context(method) 60 61 62class WrappedConnection(object): 63 64 def __init__(self, obj): 65 self._wrapped_obj = obj 66 67 def __getattr__(self, attr): 68 if attr in self.__dict__: 69 return getattr(self, attr) 70 return getattr(self._wrapped_obj, attr) 71 72 def recv(self, buflen=1024, flags=0): 73 try: 74 return self._wrapped_obj.recv(buflen, flags) 75 except SSL.SysCallError, e: 76 if e.args[1] == 'Unexpected EOF': 77 return '' 78 raise 79 except SSL.ZeroReturnError: 80 return '' 81 82 83def get_ssl_connection(context, connection): 84 return WrappedConnection(SSL.Connection(context, connection)) 85 86 87def load_privatekey(key, filetype=FILETYPE_PEM): 88 """Loads obj private key object from string.""" 89 return crypto.load_privatekey(filetype, key) 90 91 92def load_cert(cert_str, filetype=FILETYPE_PEM): 93 """Loads obj cert object from string.""" 94 return crypto.load_certificate(filetype, cert_str) 95 96 97def _dump_privatekey(key, filetype=FILETYPE_PEM): 98 """Dumps obj private key object to string.""" 99 return crypto.dump_privatekey(filetype, key) 100 101 102def _dump_cert(cert, filetype=FILETYPE_PEM): 103 """Dumps obj cert object to string.""" 104 return crypto.dump_certificate(filetype, cert) 105 106 107def generate_dummy_ca_cert(subject='_WebPageReplayCert'): 108 """Generates dummy certificate authority. 109 110 Args: 111 subject: a string representing the desired root cert issuer 112 Returns: 113 A tuple of the public key and the private key strings for the root 114 certificate 115 """ 116 if openssl_import_error: 117 raise openssl_import_error # pylint: disable=raising-bad-type 118 119 key = crypto.PKey() 120 key.generate_key(crypto.TYPE_RSA, 1024) 121 122 ca_cert = crypto.X509() 123 ca_cert.set_serial_number(int(time.time()*10000)) 124 ca_cert.set_version(2) 125 ca_cert.get_subject().CN = subject 126 ca_cert.get_subject().O = subject 127 ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2) 128 ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2) 129 ca_cert.set_issuer(ca_cert.get_subject()) 130 ca_cert.set_pubkey(key) 131 ca_cert.add_extensions([ 132 crypto.X509Extension('basicConstraints', True, 'CA:TRUE'), 133 crypto.X509Extension('nsCertType', True, 'sslCA'), 134 crypto.X509Extension('extendedKeyUsage', True, 135 ('serverAuth,clientAuth,emailProtection,' 136 'timeStamping,msCodeInd,msCodeCom,msCTLSign,' 137 'msSGC,msEFS,nsSGC')), 138 crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'), 139 crypto.X509Extension('subjectKeyIdentifier', False, 'hash', 140 subject=ca_cert), 141 ]) 142 ca_cert.sign(key, 'sha256') 143 key_str = _dump_privatekey(key) 144 ca_cert_str = _dump_cert(ca_cert) 145 return ca_cert_str, key_str 146 147 148def get_host_cert(host, port=443): 149 """Contacts the host and returns its certificate.""" 150 host_certs = [] 151 def verify_cb(conn, cert, errnum, depth, ok): 152 host_certs.append(cert) 153 # Return True to indicates that the certificate was ok. 154 return True 155 156 context = SSL.Context(SSL.SSLv23_METHOD) 157 context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate 158 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 159 connection = SSL.Connection(context, s) 160 try: 161 connection.connect((host, port)) 162 connection.send('') 163 except SSL.SysCallError: 164 pass 165 except socket.gaierror: 166 logging.debug('Host name is not valid') 167 finally: 168 connection.shutdown() 169 connection.close() 170 if not host_certs: 171 logging.warning('Unable to get host certificate from %s:%s', host, port) 172 return '' 173 return _dump_cert(host_certs[-1]) 174 175 176def write_dummy_ca_cert(ca_cert_str, key_str, cert_path): 177 """Writes four certificate files. 178 179 For example, if cert_path is "mycert.pem": 180 mycert.pem - CA plus private key 181 mycert-cert.pem - CA in PEM format 182 mycert-cert.cer - CA for Android 183 mycert-cert.p12 - CA in PKCS12 format for Windows devices 184 Args: 185 cert_path: path string such as "mycert.pem" 186 ca_cert_str: certificate string 187 key_str: private key string 188 """ 189 dirname = os.path.dirname(cert_path) 190 if dirname and not os.path.exists(dirname): 191 os.makedirs(dirname) 192 193 root_path = os.path.splitext(cert_path)[0] 194 ca_cert_path = root_path + '-cert.pem' 195 android_cer_path = root_path + '-cert.cer' 196 windows_p12_path = root_path + '-cert.p12' 197 198 # Dump the CA plus private key 199 with open(cert_path, 'w') as f: 200 f.write(key_str) 201 f.write(ca_cert_str) 202 203 # Dump the certificate in PEM format 204 with open(ca_cert_path, 'w') as f: 205 f.write(ca_cert_str) 206 207 # Create a .cer file with the same contents for Android 208 with open(android_cer_path, 'w') as f: 209 f.write(ca_cert_str) 210 211 ca_cert = load_cert(ca_cert_str) 212 key = load_privatekey(key_str) 213 # Dump the certificate in PKCS12 format for Windows devices 214 with open(windows_p12_path, 'w') as f: 215 p12 = crypto.PKCS12() 216 p12.set_certificate(ca_cert) 217 p12.set_privatekey(key) 218 f.write(p12.export()) 219 220 221def generate_cert(root_ca_cert_str, server_cert_str, server_host): 222 """Generates a cert_str with the sni field in server_cert_str signed by the 223 root_ca_cert_str. 224 225 Args: 226 root_ca_cert_str: PEM formatted string representing the root cert 227 server_cert_str: PEM formatted string representing cert 228 server_host: host name to use if there is no server_cert_str 229 Returns: 230 a PEM formatted certificate string 231 """ 232 EXTENSION_WHITELIST = set(['subjectAltName']) 233 234 if openssl_import_error: 235 raise openssl_import_error # pylint: disable=raising-bad-type 236 237 common_name = server_host 238 reused_extensions = [] 239 if server_cert_str: 240 original_cert = load_cert(server_cert_str) 241 common_name = original_cert.get_subject().commonName 242 for i in xrange(original_cert.get_extension_count()): 243 original_cert_extension = original_cert.get_extension(i) 244 if original_cert_extension.get_short_name() in EXTENSION_WHITELIST: 245 reused_extensions.append(original_cert_extension) 246 247 ca_cert = load_cert(root_ca_cert_str) 248 ca_key = load_privatekey(root_ca_cert_str) 249 250 cert = crypto.X509() 251 cert.get_subject().CN = common_name 252 cert.gmtime_adj_notBefore(-60 * 60) 253 cert.gmtime_adj_notAfter(60 * 60 * 24 * 30) 254 cert.set_issuer(ca_cert.get_subject()) 255 cert.set_serial_number(int(time.time()*10000)) 256 cert.set_pubkey(ca_key) 257 cert.add_extensions(reused_extensions) 258 cert.sign(ca_key, 'sha256') 259 260 return _dump_cert(cert) 261 262 263def install_cert_in_nssdb(home_directory_path, certificate_path): 264 """Installs a certificate into the ~/.pki/nssdb database. 265 266 Args: 267 home_directory_path: Path of the home directory where to install 268 certificate_path: Path of a CA in PEM format 269 """ 270 assert os.path.isdir(home_directory_path) 271 assert platform.system() == 'Linux', \ 272 'SSL certification authority has only been tested for linux.' 273 if (os.path.abspath(home_directory_path) == 274 os.path.abspath(os.environ['HOME'])): 275 raise Exception('Modifying $HOME/.pki/nssdb compromises your machine.') 276 277 cert_database_path = os.path.join(home_directory_path, '.pki', 'nssdb') 278 def certutil(args): 279 cmd = ['certutil', '--empty-password', '-d', 'sql:' + cert_database_path] 280 cmd.extend(args) 281 logging.info(subprocess.list2cmdline(cmd)) 282 subprocess.check_call(cmd) 283 284 if not os.path.isdir(cert_database_path): 285 os.makedirs(cert_database_path) 286 certutil(['-N']) 287 288 certutil(['-A', '-t', 'PC,,', '-n', certificate_path, '-i', certificate_path]) 289