| #!/usr/bin/env python3 |
| # Copyright 2017 The Chromium Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """This script is called without any arguments to re-generate all of the *.pem |
| files in the script's parent directory. |
| |
| """ |
| |
| import base64 |
| import datetime |
| import hashlib |
| import subprocess |
| import tempfile |
| |
| from cryptography import x509 |
| from cryptography.hazmat.primitives import hashes, serialization |
| from cryptography.hazmat.primitives.asymmetric import rsa, padding |
| from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID |
| |
| from pyasn1.codec.der import decoder, encoder |
| from pyasn1_modules import rfc2560, rfc2459 |
| from pyasn1.type import namedtype, univ, useful |
| |
| NEXT_SERIAL = 1 |
| |
| # 1/1/2017 00:00 GMT |
| CERT_DATE = datetime.datetime(2017, 1, 1, 0, 0) |
| # 1/1/2018 00:00 GMT |
| CERT_EXPIRE = CERT_DATE + datetime.timedelta(days=365) |
| # 2/1/2017 00:00 GMT |
| REVOKE_DATE = datetime.datetime(2017, 2, 1, 0, 0) |
| # 3/1/2017 00:00 GMT |
| THIS_DATE = datetime.datetime(2017, 3, 1, 0, 0) |
| # 3/2/2017 00:00 GMT |
| PRODUCED_DATE = datetime.datetime(2017, 3, 2, 0, 0) |
| # 3/5/2017 00:00 GMT |
| VERIFY_DATE = datetime.datetime(2017, 3, 5, 0, 0) |
| # 6/1/2017 00:00 GMT |
| NEXT_DATE = datetime.datetime(2017, 6, 1, 0, 0) |
| |
| sha1oid = univ.ObjectIdentifier('1.3.14.3.2.26') |
| sha1rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.5') |
| sha256oid = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1') |
| sha256rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.11') |
| |
| |
| def SigAlgOid(sig_alg): |
| if sig_alg == 'sha1': |
| return sha1rsaoid |
| if sig_alg == 'sha256': |
| return sha256rsaoid |
| raise ValueError(f"Unrecognized sig_alg: {sig_alg}") |
| |
| |
| def CreateCert(name, key_path, signer=None, ocsp=False): |
| global NEXT_SERIAL |
| with open(key_path, 'rb') as f: |
| private_key = serialization.load_pem_private_key(f.read(), password=None) |
| subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)]) |
| |
| if signer: |
| issuer = signer[1].subject |
| issuer_key = signer[2] |
| else: |
| issuer = subject |
| issuer_key = private_key |
| |
| builder = x509.CertificateBuilder() |
| builder = builder.subject_name(subject) |
| builder = builder.issuer_name(issuer) |
| builder = builder.public_key(private_key.public_key()) |
| builder = builder.serial_number(NEXT_SERIAL) |
| NEXT_SERIAL += 1 |
| builder = builder.not_valid_before(CERT_DATE) |
| builder = builder.not_valid_after(CERT_EXPIRE) |
| if ocsp: |
| builder = builder.add_extension( |
| x509.ExtendedKeyUsage([ExtendedKeyUsageOID.OCSP_SIGNING]), |
| critical=False) |
| cert = builder.sign(issuer_key, hashes.SHA256()) |
| der_cert = cert.public_bytes(serialization.Encoding.DER) |
| asn1cert = decoder.decode(der_cert, asn1Spec=rfc2459.Certificate())[0] |
| |
| if signer: |
| signer_cert = signer[0] |
| else: |
| signer_cert = asn1cert |
| return (asn1cert, cert, private_key, signer_cert) |
| |
| |
| def CreateExtension(oid='1.2.3.4', critical=False): |
| ext = rfc2459.Extension() |
| ext.setComponentByName('extnID', univ.ObjectIdentifier(oid)) |
| ext.setComponentByName('extnValue', b'DEADBEEF') |
| if critical: |
| ext.setComponentByName('critical', univ.Boolean(True)) |
| else: |
| ext.setComponentByName('critical', univ.Boolean(False)) |
| |
| return ext |
| |
| |
| ROOT_CA = CreateCert('Test CA', 'root.key', None) |
| CA = CreateCert('Test Intermediate CA', 'intermediate.key', ROOT_CA) |
| CA_LINK = CreateCert('Test OCSP Signer', 'ocsp_signer.key', CA, True) |
| CA_BADLINK = CreateCert('Test False OCSP Signer', 'bad_ocsp_signer.key', CA, |
| False) |
| CERT = CreateCert('Test Cert', 'cert.key', CA) |
| JUNK_CERT = CreateCert('Random Cert', 'cert2.key', None) |
| EXTENSION = CreateExtension() |
| |
| |
| def GetName(c): |
| rid = rfc2560.ResponderID() |
| subject = c[0].getComponentByName('tbsCertificate').getComponentByName( |
| 'subject') |
| rn = rid.componentType.getTypeByPosition(0).clone() |
| for i in range(len(subject)): |
| rn.setComponentByPosition(i, subject.getComponentByPosition(i)) |
| rid.setComponentByName('byName', rn) |
| return rid |
| |
| |
| def GetKeyHash(c): |
| rid = rfc2560.ResponderID() |
| spk = c[0].getComponentByName('tbsCertificate').getComponentByName( |
| 'subjectPublicKeyInfo').getComponentByName('subjectPublicKey') |
| keyHash = hashlib.sha1(spk.asOctets()).digest() |
| rid.setComponentByName('byKey', keyHash) |
| return rid |
| |
| |
| def CreateSingleResponse(cert=CERT, |
| status=0, |
| this_update=THIS_DATE, |
| next_update=None, |
| revoke_time=None, |
| reason=None, |
| extensions=[]): |
| sr = rfc2560.SingleResponse() |
| cid = sr.setComponentByName('certID').getComponentByName('certID') |
| |
| issuer_tbs = cert[3].getComponentByName('tbsCertificate') |
| tbs = cert[0].getComponentByName('tbsCertificate') |
| name_hash = hashlib.sha1( |
| encoder.encode(issuer_tbs.getComponentByName('subject'))).digest() |
| key_hash = hashlib.sha1( |
| issuer_tbs.getComponentByName('subjectPublicKeyInfo') |
| .getComponentByName('subjectPublicKey').asOctets()).digest() |
| sn = tbs.getComponentByName('serialNumber') |
| |
| ha = cid.setComponentByName('hashAlgorithm').getComponentByName( |
| 'hashAlgorithm') |
| ha.setComponentByName('algorithm', sha1oid) |
| cid.setComponentByName('issuerNameHash', name_hash) |
| cid.setComponentByName('issuerKeyHash', key_hash) |
| cid.setComponentByName('serialNumber', sn) |
| |
| cs = rfc2560.CertStatus() |
| if status == 0: |
| cs.setComponentByName('good') |
| elif status == 1: |
| ri = cs.componentType.getTypeByPosition(1).clone() |
| if revoke_time == None: |
| revoke_time = REVOKE_DATE |
| ri.setComponentByName('revocationTime', |
| useful.GeneralizedTime( |
| revoke_time.strftime('%Y%m%d%H%M%SZ'))) |
| if reason: |
| ri.setComponentByName('revocationReason', reason) |
| cs.setComponentByName('revoked', ri) |
| else: |
| ui = cs.componentType.getTypeByPosition(2).clone() |
| cs.setComponentByName('unknown', ui) |
| |
| sr.setComponentByName('certStatus', cs) |
| |
| sr.setComponentByName('thisUpdate', |
| useful.GeneralizedTime( |
| this_update.strftime('%Y%m%d%H%M%SZ'))) |
| if next_update: |
| sr.setComponentByName('nextUpdate', next_update.strftime('%Y%m%d%H%M%SZ')) |
| if extensions: |
| elist = sr.setComponentByName('singleExtensions').getComponentByName( |
| 'singleExtensions') |
| for i in range(len(extensions)): |
| elist.setComponentByPosition(i, extensions[i]) |
| return sr |
| |
| |
| class BadBasicOCSPResponse(univ.Sequence): |
| componentType = namedtype.NamedTypes( |
| namedtype.NamedType('tbsResponseData', univ.Any()), |
| namedtype.NamedType('signatureAlgorithm', rfc2459.AlgorithmIdentifier()), |
| namedtype.NamedType('signature', univ.BitString()), |
| ) |
| |
| |
| def Create(signer=None, |
| response_status=0, |
| response_type='1.3.6.1.5.5.7.48.1.1', |
| signature=None, |
| version=1, |
| responder=None, |
| responses=None, |
| extensions=None, |
| certs=None, |
| sigAlg='sha1', |
| produced_at=PRODUCED_DATE, |
| invalid_response_data=False): |
| ocsp = rfc2560.OCSPResponse() |
| ocsp.setComponentByName('responseStatus', response_status) |
| |
| if response_status != 0: |
| return encoder.encode(ocsp) |
| |
| if not signer: |
| signer = CA |
| |
| if invalid_response_data: |
| tbs = univ.OctetString(b'invalid') |
| else: |
| tbs = rfc2560.ResponseData() |
| if version != 1: |
| tbs.setComponentByName('version', version) |
| if not responder: |
| responder = GetName(signer) |
| tbs.setComponentByName('responderID', responder) |
| tbs.setComponentByName('producedAt', |
| useful.GeneralizedTime( |
| produced_at.strftime('%Y%m%d%H%M%SZ'))) |
| rlist = tbs.setComponentByName('responses').getComponentByName('responses') |
| if responses == None: |
| responses = [CreateSingleResponse(CERT, 0)] |
| if responses: |
| for i in range(len(responses)): |
| rlist.setComponentByPosition(i, responses[i]) |
| |
| if extensions: |
| elist = tbs.setComponentByName('responseExtensions').getComponentByName( |
| 'responseExtensions') |
| for i in range(len(extensions)): |
| elist.setComponentByPosition(i, extensions[i]) |
| |
| sa = rfc2459.AlgorithmIdentifier() |
| sa.setComponentByName('algorithm', SigAlgOid(sigAlg)) |
| # TODO(mattm): If pyasn1 gives an error |
| # "Component value is tag-incompatible: Null() vs Any()", try hacking |
| # pyasn1_modules/rfc2459.py's AlgorithmIdentifier to specify univ.Null as the |
| # type for 'parameters'. (Which is an ugly hack, but lets the script work.) |
| sa.setComponentByName('parameters', univ.Null()) |
| |
| if invalid_response_data: |
| basic = BadBasicOCSPResponse() |
| else: |
| basic = rfc2560.BasicOCSPResponse() |
| basic.setComponentByName('tbsResponseData', tbs) |
| basic.setComponentByName('signatureAlgorithm', sa) |
| if not signature: |
| if sigAlg == 'sha1': |
| hash_alg = hashes.SHA1() |
| elif sigAlg == 'sha256': |
| hash_alg = hashes.SHA256() |
| else: |
| raise ValueError(f"Unrecognized signature algorithm: {sigAlg}") |
| signature = signer[2].sign(encoder.encode(tbs), padding.PKCS1v15(), |
| hash_alg) |
| |
| basic.setComponentByName('signature', |
| univ.BitString(hexValue=signature.hex())) |
| if certs: |
| cs = basic.setComponentByName('certs').getComponentByName('certs') |
| for i in range(len(certs)): |
| cs.setComponentByPosition(i, certs[i][0]) |
| |
| rbytes = ocsp.componentType.getTypeByPosition(1) |
| rbytes.setComponentByName('responseType', |
| univ.ObjectIdentifier(response_type)) |
| rbytes.setComponentByName('response', encoder.encode(basic)) |
| |
| ocsp.setComponentByName('responseBytes', rbytes) |
| return encoder.encode(ocsp) |
| |
| |
| def MakePemBlock(der, name): |
| b64 = base64.b64encode(der).decode('ascii') |
| wrapped = '\n'.join(b64[pos:pos + 64] for pos in range(0, len(b64), 64)) |
| return '-----BEGIN %s-----\n%s\n-----END %s-----' % (name, wrapped, name) |
| |
| |
| def CreateOCSPRequestDer(issuer_cert_pem, cert_pem): |
| '''Uses OpenSSL to generate a basic OCSPRequest for |cert_pem|.''' |
| |
| with tempfile.NamedTemporaryFile( |
| delete_on_close=False, prefix="issuer_", suffix=".pem" |
| ) as issuer, tempfile.NamedTemporaryFile( |
| delete_on_close=False, prefix="cert_", suffix=".pem" |
| ) as cert, tempfile.NamedTemporaryFile( |
| delete_on_close=False, prefix="request_", suffix=".der" |
| ) as request: |
| issuer.write(issuer_cert_pem.encode('utf-8')) |
| issuer.close() |
| cert.write(cert_pem.encode('utf-8')) |
| cert.close() |
| request.close() |
| |
| p = subprocess.run([ |
| "openssl", "ocsp", "-no_nonce", "-issuer", issuer.name, "-cert", |
| cert.name, "-reqout", request.name |
| ], capture_output=True, check=True) |
| |
| with open(request.name, "rb") as f: |
| return f.read() |
| |
| |
| def Store(fname, description, ca, data_der): |
| ca_cert_pem = ca[1].public_bytes(serialization.Encoding.PEM).decode('ascii') |
| cert_pem = CERT[1].public_bytes(serialization.Encoding.PEM).decode('ascii') |
| |
| ocsp_request_der = CreateOCSPRequestDer(ca_cert_pem, cert_pem) |
| |
| out = ('%s\n%s\n%s\n\n%s\n%s') % ( |
| description, |
| MakePemBlock(data_der, "OCSP RESPONSE"), |
| ca_cert_pem.replace('CERTIFICATE', 'CA CERTIFICATE'), |
| cert_pem, |
| MakePemBlock(ocsp_request_der, "OCSP REQUEST")) |
| with open('%s.pem' % fname, 'w') as f: |
| f.write(out) |
| |
| |
| Store( |
| 'no_response', |
| 'No SingleResponses attached to the response', |
| CA, |
| Create(responses=[])) |
| |
| Store( |
| 'malformed_request', |
| 'Has a status of MALFORMED_REQUEST', |
| CA, |
| Create(response_status=1)) |
| Store( |
| 'bad_status', |
| 'Has an invalid status larger than the defined Status enumeration', |
| CA, |
| Create(response_status=17)) |
| Store( |
| 'bad_ocsp_type', |
| 'Has an invalid OCSP OID', |
| CA, |
| Create(response_type='1.3.6.1.5.5.7.48.1.2')) |
| Store( |
| 'bad_signature', |
| 'Has an invalid signature', |
| CA, |
| Create(signature=b'\xde\xad\xbe\xef')) |
| Store('ocsp_sign_direct', 'Signed directly by the issuer', CA, |
| Create(signer=CA, certs=[])) |
| Store('ocsp_sign_indirect', 'Signed indirectly through an intermediate', CA, |
| Create(signer=CA_LINK, certs=[CA_LINK])) |
| Store('ocsp_sign_indirect_missing', |
| 'Signed indirectly through a missing intermediate', CA, |
| Create(signer=CA_LINK, certs=[])) |
| Store('ocsp_sign_bad_indirect', |
| 'Signed through an intermediate without the correct key usage', CA, |
| Create(signer=CA_BADLINK, certs=[CA_BADLINK])) |
| Store('ocsp_extra_certs', 'Includes extra certs', CA, |
| Create(signer=CA, certs=[CA, CA_LINK])) |
| Store('has_version', 'Includes a default version V1', CA, Create(version=1)) |
| Store( |
| 'responder_name', |
| 'Uses byName to identify the signer', |
| CA, |
| Create(responder=GetName(CA))) |
| |
| # TODO(eroman): pyasn1 module has a bug in rfc2560.ResponderID() that will use |
| # IMPLICIT rather than EXPLICIT tagging for byKey |
| # (https://github.com/etingof/pyasn1-modules/issues/8). If using an affected |
| # version of the library you will need to patch pyasn1_modules/rfc2560.py and |
| # replace "implicitTag" with "explicitTag" in ResponderID to generate this |
| # test data correctly. |
| Store( |
| 'responder_id', |
| 'Uses byKey to identify the signer', |
| CA, |
| Create(responder=GetKeyHash(CA))) |
| Store( |
| 'has_extension', |
| 'Includes an x509v3 extension', |
| CA, |
| Create(extensions=[EXTENSION])) |
| |
| Store( |
| 'good_response', |
| 'Is a valid response for the cert', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 0)])) |
| Store('good_response_sha256', |
| 'Is a valid response for the cert with a SHA256 signature', CA, |
| Create(responses=[CreateSingleResponse(CERT, 0)], sigAlg='sha256')) |
| Store( |
| 'good_response_next_update', |
| 'Is a valid response for the cert until nextUpdate', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 0, next_update=NEXT_DATE)])) |
| Store( |
| 'revoke_response', |
| 'Is a REVOKE response for the cert', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 1)])) |
| Store( |
| 'revoke_response_reason', |
| 'Is a REVOKE response for the cert with a reason', |
| CA, |
| Create(responses=[ |
| CreateSingleResponse(CERT, 1, revoke_time=REVOKE_DATE, reason=1) |
| ])) |
| Store( |
| 'unknown_response', |
| 'Is an UNKNOWN response for the cert', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 2)])) |
| Store( |
| 'multiple_response', |
| 'Has multiple responses for the cert', |
| CA, |
| Create(responses=[ |
| CreateSingleResponse(CERT, 0), |
| CreateSingleResponse(CERT, 2) |
| ])) |
| Store( |
| 'other_response', |
| 'Is a response for a different cert', |
| CA, |
| Create(responses=[ |
| CreateSingleResponse(JUNK_CERT, 0), |
| CreateSingleResponse(JUNK_CERT, 1) |
| ])) |
| Store( |
| 'has_single_extension', |
| 'Has an extension in the SingleResponse', |
| CA, |
| Create(responses=[ |
| CreateSingleResponse(CERT, 0, extensions=[CreateExtension()]) |
| ])) |
| Store( |
| 'has_critical_single_extension', |
| 'Has a critical extension in the SingleResponse', CA, |
| Create(responses=[ |
| CreateSingleResponse( |
| CERT, 0, extensions=[CreateExtension('1.2.3.4', critical=True)]) |
| ])) |
| Store( |
| 'has_critical_response_extension', |
| 'Has a critical extension in the ResponseData', CA, |
| Create( |
| responses=[CreateSingleResponse(CERT, 0)], |
| extensions=[CreateExtension('1.2.3.4', critical=True)])) |
| Store( |
| 'has_critical_ct_extension', |
| 'Has a critical CT extension in the SingleResponse', CA, |
| Create(responses=[ |
| CreateSingleResponse( |
| CERT, |
| 0, |
| extensions=[ |
| CreateExtension('1.3.6.1.4.1.11129.2.4.5', critical=True) |
| ]) |
| ])) |
| |
| Store('missing_response', 'Missing a response for the cert', CA, |
| Create(response_status=0, responses=[])) |
| |
| Store('stale_response', 'nextUpdate is before the current time', CA, |
| Create(responses=[ |
| CreateSingleResponse( |
| CERT, |
| status=0, |
| this_update=VERIFY_DATE - datetime.timedelta(days=2), |
| next_update=VERIFY_DATE - datetime.timedelta(days=1), |
| ), |
| ])) |
| |
| Store('future_response', 'thisUpdate is after the current time', CA, |
| Create(responses=[ |
| CreateSingleResponse( |
| CERT, |
| status=0, |
| this_update=VERIFY_DATE + datetime.timedelta(days=1), |
| ), |
| ])) |
| |
| Store('old_response', 'thisUpdate is over a week before the current time', CA, |
| Create(responses=[ |
| CreateSingleResponse( |
| CERT, |
| status=0, |
| this_update=VERIFY_DATE - datetime.timedelta(days=8), |
| ), |
| ])) |
| |
| Store('produced_early_response', 'producedAt is before the cert\'s notBefore', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 0)], |
| produced_at=CERT_DATE - datetime.timedelta(days=1))) |
| |
| Store('produced_late_response', 'producedAt is after the cert\'s notAfter', |
| CA, |
| Create(responses=[CreateSingleResponse(CERT, 0)], |
| produced_at=CERT_EXPIRE + datetime.timedelta(days=1))) |
| |
| Store('invalid_response', 'OCSPResponse cannot be parsed', CA, b'invalid') |
| |
| Store('invalid_response_data', 'ResponseData cannot be parsed', CA, |
| Create(invalid_response_data=True)) |
| |
| Store( |
| 'multiple_response_good_revoked', |
| 'Has both a good and a revoked response for the cert', |
| CA, |
| Create(responses=[ |
| CreateSingleResponse(CERT, 0), |
| CreateSingleResponse(CERT, 1), |
| ])) |