1#!/usr/bin/python
2#
3from pyasn1.codec.der import decoder, encoder
4from pyasn1_modules import rfc2560, rfc2459, pem
5from pyasn1.type import univ
6import sys, hashlib
7try:
8  import urllib2
9except ImportError:
10  import urllib.request as urllib2
11
12sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26))
13
14class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder):
15    # These methods just do not encode tag and length fields of TLV
16    def encodeTag(self, *args): return ''
17    def encodeLength(self, *args): return ''
18    def encodeValue(*args):
19        substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args)
20        # OCSP-specific hack follows: cut off the "unused bit count"
21        # encoded bit-string value.
22        return substrate[1:], isConstructed
23
24    def __call__(self, bitStringValue):
25        return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0)
26
27valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder()
28
29def mkOcspRequest(issuerCert, userCert):
30    issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate')
31    issuerSubject = issuerTbsCertificate.getComponentByName('subject')
32
33    userTbsCertificate = userCert.getComponentByName('tbsCertificate')
34    userIssuer = userTbsCertificate.getComponentByName('issuer')
35
36    assert issuerSubject == userIssuer, '%s\n%s' % (
37        issuerSubject.prettyPrint(), userIssuer.prettyPrint()
38        )
39
40    userIssuerHash = hashlib.sha1(
41        encoder.encode(userIssuer)
42        ).digest()
43
44    issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
45
46    issuerKeyHash =  hashlib.sha1(
47        valueOnlyBitStringEncoder(issuerSubjectPublicKey)
48        ).digest()
49
50    userSerialNumber = userTbsCertificate.getComponentByName('serialNumber')
51
52    # Build request object
53
54    request = rfc2560.Request()
55
56    reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert')
57
58    hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm')
59    hashAlgorithm.setComponentByName('algorithm', sha1oid)
60
61    reqCert.setComponentByName('issuerNameHash', userIssuerHash)
62    reqCert.setComponentByName('issuerKeyHash', issuerKeyHash)
63    reqCert.setComponentByName('serialNumber', userSerialNumber)
64
65    ocspRequest = rfc2560.OCSPRequest()
66
67    tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest')
68    tbsRequest.setComponentByName('version', 'v1')
69
70    requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList')
71    requestList.setComponentByPosition(0, request)
72
73    return ocspRequest
74
75def parseOcspResponse(ocspResponse):
76    responseStatus = ocspResponse.getComponentByName('responseStatus')
77    assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint()
78    responseBytes = ocspResponse.getComponentByName('responseBytes')
79    responseType = responseBytes.getComponentByName('responseType')
80    assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint()
81
82    response = responseBytes.getComponentByName('response')
83
84    basicOCSPResponse, _ = decoder.decode(
85        response, asn1Spec=rfc2560.BasicOCSPResponse()
86        )
87
88    tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData')
89
90    response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0)
91
92    return (
93        tbsResponseData.getComponentByName('producedAt'),
94        response0.getComponentByName('certID'),
95        response0.getComponentByName('certStatus').getName(),
96        response0.getComponentByName('thisUpdate')
97        )
98
99if len(sys.argv) != 2:
100    print("""Usage:
101$ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0])
102    sys.exit(-1)
103else:
104    ocspUrl = sys.argv[1]
105
106# Parse CA and user certificates
107
108issuerCert, _ = decoder.decode(
109    pem.readPemBlocksFromFile(
110        sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----')
111    )[1],
112    asn1Spec=rfc2459.Certificate()
113    )
114userCert, _ = decoder.decode(
115    pem.readPemBlocksFromFile(
116        sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----')
117    )[1],
118    asn1Spec=rfc2459.Certificate()
119    )
120
121# Build OCSP request
122
123ocspReq = mkOcspRequest(issuerCert, userCert)
124
125# Use HTTP POST to get response (see Appendix A of RFC 2560)
126# In case you need proxies, set the http_proxy env variable
127
128httpReq = urllib2.Request(
129    ocspUrl,
130    encoder.encode(ocspReq),
131    { 'Content-Type': 'application/ocsp-request' }
132    )
133httpRsp = urllib2.urlopen(httpReq).read()
134
135# Process OCSP response
136
137ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse())
138
139producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp)
140
141print('Certificate ID %s is %s at %s till %s\n' % (
142       certId.getComponentByName('serialNumber'),
143       certStatus,
144       producedAt,
145       thisUpdate))
146