blob: 978d74e1c4747408c0f008cde6f4b533de2f99d8 [file]
#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This script is called without any arguments to re-generate all of the *.pem
files in the script's parent directory.
"""
import base64
import datetime
import hashlib
import subprocess
import tempfile
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
from pyasn1.codec.der import decoder, encoder
from pyasn1_modules import rfc2560, rfc2459
from pyasn1.type import namedtype, univ, useful
NEXT_SERIAL = 1
# 1/1/2017 00:00 GMT
CERT_DATE = datetime.datetime(2017, 1, 1, 0, 0)
# 1/1/2018 00:00 GMT
CERT_EXPIRE = CERT_DATE + datetime.timedelta(days=365)
# 2/1/2017 00:00 GMT
REVOKE_DATE = datetime.datetime(2017, 2, 1, 0, 0)
# 3/1/2017 00:00 GMT
THIS_DATE = datetime.datetime(2017, 3, 1, 0, 0)
# 3/2/2017 00:00 GMT
PRODUCED_DATE = datetime.datetime(2017, 3, 2, 0, 0)
# 3/5/2017 00:00 GMT
VERIFY_DATE = datetime.datetime(2017, 3, 5, 0, 0)
# 6/1/2017 00:00 GMT
NEXT_DATE = datetime.datetime(2017, 6, 1, 0, 0)
sha1oid = univ.ObjectIdentifier('1.3.14.3.2.26')
sha1rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.5')
sha256oid = univ.ObjectIdentifier('2.16.840.1.101.3.4.2.1')
sha256rsaoid = univ.ObjectIdentifier('1.2.840.113549.1.1.11')
def SigAlgOid(sig_alg):
if sig_alg == 'sha1':
return sha1rsaoid
if sig_alg == 'sha256':
return sha256rsaoid
raise ValueError(f"Unrecognized sig_alg: {sig_alg}")
def CreateCert(name, key_path, signer=None, ocsp=False):
global NEXT_SERIAL
with open(key_path, 'rb') as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, name)])
if signer:
issuer = signer[1].subject
issuer_key = signer[2]
else:
issuer = subject
issuer_key = private_key
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject)
builder = builder.issuer_name(issuer)
builder = builder.public_key(private_key.public_key())
builder = builder.serial_number(NEXT_SERIAL)
NEXT_SERIAL += 1
builder = builder.not_valid_before(CERT_DATE)
builder = builder.not_valid_after(CERT_EXPIRE)
if ocsp:
builder = builder.add_extension(
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.OCSP_SIGNING]),
critical=False)
cert = builder.sign(issuer_key, hashes.SHA256())
der_cert = cert.public_bytes(serialization.Encoding.DER)
asn1cert = decoder.decode(der_cert, asn1Spec=rfc2459.Certificate())[0]
if signer:
signer_cert = signer[0]
else:
signer_cert = asn1cert
return (asn1cert, cert, private_key, signer_cert)
def CreateExtension(oid='1.2.3.4', critical=False):
ext = rfc2459.Extension()
ext.setComponentByName('extnID', univ.ObjectIdentifier(oid))
ext.setComponentByName('extnValue', b'DEADBEEF')
if critical:
ext.setComponentByName('critical', univ.Boolean(True))
else:
ext.setComponentByName('critical', univ.Boolean(False))
return ext
ROOT_CA = CreateCert('Test CA', 'root.key', None)
CA = CreateCert('Test Intermediate CA', 'intermediate.key', ROOT_CA)
CA_LINK = CreateCert('Test OCSP Signer', 'ocsp_signer.key', CA, True)
CA_BADLINK = CreateCert('Test False OCSP Signer', 'bad_ocsp_signer.key', CA,
False)
CERT = CreateCert('Test Cert', 'cert.key', CA)
JUNK_CERT = CreateCert('Random Cert', 'cert2.key', None)
EXTENSION = CreateExtension()
def GetName(c):
rid = rfc2560.ResponderID()
subject = c[0].getComponentByName('tbsCertificate').getComponentByName(
'subject')
rn = rid.componentType.getTypeByPosition(0).clone()
for i in range(len(subject)):
rn.setComponentByPosition(i, subject.getComponentByPosition(i))
rid.setComponentByName('byName', rn)
return rid
def GetKeyHash(c):
rid = rfc2560.ResponderID()
spk = c[0].getComponentByName('tbsCertificate').getComponentByName(
'subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
keyHash = hashlib.sha1(spk.asOctets()).digest()
rid.setComponentByName('byKey', keyHash)
return rid
def CreateSingleResponse(cert=CERT,
status=0,
this_update=THIS_DATE,
next_update=None,
revoke_time=None,
reason=None,
extensions=[]):
sr = rfc2560.SingleResponse()
cid = sr.setComponentByName('certID').getComponentByName('certID')
issuer_tbs = cert[3].getComponentByName('tbsCertificate')
tbs = cert[0].getComponentByName('tbsCertificate')
name_hash = hashlib.sha1(
encoder.encode(issuer_tbs.getComponentByName('subject'))).digest()
key_hash = hashlib.sha1(
issuer_tbs.getComponentByName('subjectPublicKeyInfo')
.getComponentByName('subjectPublicKey').asOctets()).digest()
sn = tbs.getComponentByName('serialNumber')
ha = cid.setComponentByName('hashAlgorithm').getComponentByName(
'hashAlgorithm')
ha.setComponentByName('algorithm', sha1oid)
cid.setComponentByName('issuerNameHash', name_hash)
cid.setComponentByName('issuerKeyHash', key_hash)
cid.setComponentByName('serialNumber', sn)
cs = rfc2560.CertStatus()
if status == 0:
cs.setComponentByName('good')
elif status == 1:
ri = cs.componentType.getTypeByPosition(1).clone()
if revoke_time == None:
revoke_time = REVOKE_DATE
ri.setComponentByName('revocationTime',
useful.GeneralizedTime(
revoke_time.strftime('%Y%m%d%H%M%SZ')))
if reason:
ri.setComponentByName('revocationReason', reason)
cs.setComponentByName('revoked', ri)
else:
ui = cs.componentType.getTypeByPosition(2).clone()
cs.setComponentByName('unknown', ui)
sr.setComponentByName('certStatus', cs)
sr.setComponentByName('thisUpdate',
useful.GeneralizedTime(
this_update.strftime('%Y%m%d%H%M%SZ')))
if next_update:
sr.setComponentByName('nextUpdate', next_update.strftime('%Y%m%d%H%M%SZ'))
if extensions:
elist = sr.setComponentByName('singleExtensions').getComponentByName(
'singleExtensions')
for i in range(len(extensions)):
elist.setComponentByPosition(i, extensions[i])
return sr
class BadBasicOCSPResponse(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('tbsResponseData', univ.Any()),
namedtype.NamedType('signatureAlgorithm', rfc2459.AlgorithmIdentifier()),
namedtype.NamedType('signature', univ.BitString()),
)
def Create(signer=None,
response_status=0,
response_type='1.3.6.1.5.5.7.48.1.1',
signature=None,
version=1,
responder=None,
responses=None,
extensions=None,
certs=None,
sigAlg='sha1',
produced_at=PRODUCED_DATE,
invalid_response_data=False):
ocsp = rfc2560.OCSPResponse()
ocsp.setComponentByName('responseStatus', response_status)
if response_status != 0:
return encoder.encode(ocsp)
if not signer:
signer = CA
if invalid_response_data:
tbs = univ.OctetString(b'invalid')
else:
tbs = rfc2560.ResponseData()
if version != 1:
tbs.setComponentByName('version', version)
if not responder:
responder = GetName(signer)
tbs.setComponentByName('responderID', responder)
tbs.setComponentByName('producedAt',
useful.GeneralizedTime(
produced_at.strftime('%Y%m%d%H%M%SZ')))
rlist = tbs.setComponentByName('responses').getComponentByName('responses')
if responses == None:
responses = [CreateSingleResponse(CERT, 0)]
if responses:
for i in range(len(responses)):
rlist.setComponentByPosition(i, responses[i])
if extensions:
elist = tbs.setComponentByName('responseExtensions').getComponentByName(
'responseExtensions')
for i in range(len(extensions)):
elist.setComponentByPosition(i, extensions[i])
sa = rfc2459.AlgorithmIdentifier()
sa.setComponentByName('algorithm', SigAlgOid(sigAlg))
# TODO(mattm): If pyasn1 gives an error
# "Component value is tag-incompatible: Null() vs Any()", try hacking
# pyasn1_modules/rfc2459.py's AlgorithmIdentifier to specify univ.Null as the
# type for 'parameters'. (Which is an ugly hack, but lets the script work.)
sa.setComponentByName('parameters', univ.Null())
if invalid_response_data:
basic = BadBasicOCSPResponse()
else:
basic = rfc2560.BasicOCSPResponse()
basic.setComponentByName('tbsResponseData', tbs)
basic.setComponentByName('signatureAlgorithm', sa)
if not signature:
if sigAlg == 'sha1':
hash_alg = hashes.SHA1()
elif sigAlg == 'sha256':
hash_alg = hashes.SHA256()
else:
raise ValueError(f"Unrecognized signature algorithm: {sigAlg}")
signature = signer[2].sign(encoder.encode(tbs), padding.PKCS1v15(),
hash_alg)
basic.setComponentByName('signature',
univ.BitString(hexValue=signature.hex()))
if certs:
cs = basic.setComponentByName('certs').getComponentByName('certs')
for i in range(len(certs)):
cs.setComponentByPosition(i, certs[i][0])
rbytes = ocsp.componentType.getTypeByPosition(1)
rbytes.setComponentByName('responseType',
univ.ObjectIdentifier(response_type))
rbytes.setComponentByName('response', encoder.encode(basic))
ocsp.setComponentByName('responseBytes', rbytes)
return encoder.encode(ocsp)
def MakePemBlock(der, name):
b64 = base64.b64encode(der).decode('ascii')
wrapped = '\n'.join(b64[pos:pos + 64] for pos in range(0, len(b64), 64))
return '-----BEGIN %s-----\n%s\n-----END %s-----' % (name, wrapped, name)
def CreateOCSPRequestDer(issuer_cert_pem, cert_pem):
'''Uses OpenSSL to generate a basic OCSPRequest for |cert_pem|.'''
with tempfile.NamedTemporaryFile(
delete_on_close=False, prefix="issuer_", suffix=".pem"
) as issuer, tempfile.NamedTemporaryFile(
delete_on_close=False, prefix="cert_", suffix=".pem"
) as cert, tempfile.NamedTemporaryFile(
delete_on_close=False, prefix="request_", suffix=".der"
) as request:
issuer.write(issuer_cert_pem.encode('utf-8'))
issuer.close()
cert.write(cert_pem.encode('utf-8'))
cert.close()
request.close()
p = subprocess.run([
"openssl", "ocsp", "-no_nonce", "-issuer", issuer.name, "-cert",
cert.name, "-reqout", request.name
], capture_output=True, check=True)
with open(request.name, "rb") as f:
return f.read()
def Store(fname, description, ca, data_der):
ca_cert_pem = ca[1].public_bytes(serialization.Encoding.PEM).decode('ascii')
cert_pem = CERT[1].public_bytes(serialization.Encoding.PEM).decode('ascii')
ocsp_request_der = CreateOCSPRequestDer(ca_cert_pem, cert_pem)
out = ('%s\n%s\n%s\n\n%s\n%s') % (
description,
MakePemBlock(data_der, "OCSP RESPONSE"),
ca_cert_pem.replace('CERTIFICATE', 'CA CERTIFICATE'),
cert_pem,
MakePemBlock(ocsp_request_der, "OCSP REQUEST"))
with open('%s.pem' % fname, 'w') as f:
f.write(out)
Store(
'no_response',
'No SingleResponses attached to the response',
CA,
Create(responses=[]))
Store(
'malformed_request',
'Has a status of MALFORMED_REQUEST',
CA,
Create(response_status=1))
Store(
'bad_status',
'Has an invalid status larger than the defined Status enumeration',
CA,
Create(response_status=17))
Store(
'bad_ocsp_type',
'Has an invalid OCSP OID',
CA,
Create(response_type='1.3.6.1.5.5.7.48.1.2'))
Store(
'bad_signature',
'Has an invalid signature',
CA,
Create(signature=b'\xde\xad\xbe\xef'))
Store('ocsp_sign_direct', 'Signed directly by the issuer', CA,
Create(signer=CA, certs=[]))
Store('ocsp_sign_indirect', 'Signed indirectly through an intermediate', CA,
Create(signer=CA_LINK, certs=[CA_LINK]))
Store('ocsp_sign_indirect_missing',
'Signed indirectly through a missing intermediate', CA,
Create(signer=CA_LINK, certs=[]))
Store('ocsp_sign_bad_indirect',
'Signed through an intermediate without the correct key usage', CA,
Create(signer=CA_BADLINK, certs=[CA_BADLINK]))
Store('ocsp_extra_certs', 'Includes extra certs', CA,
Create(signer=CA, certs=[CA, CA_LINK]))
Store('has_version', 'Includes a default version V1', CA, Create(version=1))
Store(
'responder_name',
'Uses byName to identify the signer',
CA,
Create(responder=GetName(CA)))
# TODO(eroman): pyasn1 module has a bug in rfc2560.ResponderID() that will use
# IMPLICIT rather than EXPLICIT tagging for byKey
# (https://github.com/etingof/pyasn1-modules/issues/8). If using an affected
# version of the library you will need to patch pyasn1_modules/rfc2560.py and
# replace "implicitTag" with "explicitTag" in ResponderID to generate this
# test data correctly.
Store(
'responder_id',
'Uses byKey to identify the signer',
CA,
Create(responder=GetKeyHash(CA)))
Store(
'has_extension',
'Includes an x509v3 extension',
CA,
Create(extensions=[EXTENSION]))
Store(
'good_response',
'Is a valid response for the cert',
CA,
Create(responses=[CreateSingleResponse(CERT, 0)]))
Store('good_response_sha256',
'Is a valid response for the cert with a SHA256 signature', CA,
Create(responses=[CreateSingleResponse(CERT, 0)], sigAlg='sha256'))
Store(
'good_response_next_update',
'Is a valid response for the cert until nextUpdate',
CA,
Create(responses=[CreateSingleResponse(CERT, 0, next_update=NEXT_DATE)]))
Store(
'revoke_response',
'Is a REVOKE response for the cert',
CA,
Create(responses=[CreateSingleResponse(CERT, 1)]))
Store(
'revoke_response_reason',
'Is a REVOKE response for the cert with a reason',
CA,
Create(responses=[
CreateSingleResponse(CERT, 1, revoke_time=REVOKE_DATE, reason=1)
]))
Store(
'unknown_response',
'Is an UNKNOWN response for the cert',
CA,
Create(responses=[CreateSingleResponse(CERT, 2)]))
Store(
'multiple_response',
'Has multiple responses for the cert',
CA,
Create(responses=[
CreateSingleResponse(CERT, 0),
CreateSingleResponse(CERT, 2)
]))
Store(
'other_response',
'Is a response for a different cert',
CA,
Create(responses=[
CreateSingleResponse(JUNK_CERT, 0),
CreateSingleResponse(JUNK_CERT, 1)
]))
Store(
'has_single_extension',
'Has an extension in the SingleResponse',
CA,
Create(responses=[
CreateSingleResponse(CERT, 0, extensions=[CreateExtension()])
]))
Store(
'has_critical_single_extension',
'Has a critical extension in the SingleResponse', CA,
Create(responses=[
CreateSingleResponse(
CERT, 0, extensions=[CreateExtension('1.2.3.4', critical=True)])
]))
Store(
'has_critical_response_extension',
'Has a critical extension in the ResponseData', CA,
Create(
responses=[CreateSingleResponse(CERT, 0)],
extensions=[CreateExtension('1.2.3.4', critical=True)]))
Store(
'has_critical_ct_extension',
'Has a critical CT extension in the SingleResponse', CA,
Create(responses=[
CreateSingleResponse(
CERT,
0,
extensions=[
CreateExtension('1.3.6.1.4.1.11129.2.4.5', critical=True)
])
]))
Store('missing_response', 'Missing a response for the cert', CA,
Create(response_status=0, responses=[]))
Store('stale_response', 'nextUpdate is before the current time', CA,
Create(responses=[
CreateSingleResponse(
CERT,
status=0,
this_update=VERIFY_DATE - datetime.timedelta(days=2),
next_update=VERIFY_DATE - datetime.timedelta(days=1),
),
]))
Store('future_response', 'thisUpdate is after the current time', CA,
Create(responses=[
CreateSingleResponse(
CERT,
status=0,
this_update=VERIFY_DATE + datetime.timedelta(days=1),
),
]))
Store('old_response', 'thisUpdate is over a week before the current time', CA,
Create(responses=[
CreateSingleResponse(
CERT,
status=0,
this_update=VERIFY_DATE - datetime.timedelta(days=8),
),
]))
Store('produced_early_response', 'producedAt is before the cert\'s notBefore',
CA,
Create(responses=[CreateSingleResponse(CERT, 0)],
produced_at=CERT_DATE - datetime.timedelta(days=1)))
Store('produced_late_response', 'producedAt is after the cert\'s notAfter',
CA,
Create(responses=[CreateSingleResponse(CERT, 0)],
produced_at=CERT_EXPIRE + datetime.timedelta(days=1)))
Store('invalid_response', 'OCSPResponse cannot be parsed', CA, b'invalid')
Store('invalid_response_data', 'ResponseData cannot be parsed', CA,
Create(invalid_response_data=True))
Store(
'multiple_response_good_revoked',
'Has both a good and a revoked response for the cert',
CA,
Create(responses=[
CreateSingleResponse(CERT, 0),
CreateSingleResponse(CERT, 1),
]))