1672 lines
61 KiB
Python
Raw Normal View History

2024-10-11 18:43:02 +03:00
#!/usr/bin/env python3
"""
Generates test cases that aim to validate name constraints and other
name-related parts of webpki.
Run this script from tests/. It edits the bottom part of some .rs files and
drops testcase data into subdirectories as required.
"""
import argparse
import os
from typing import TextIO, Optional, Union, Any, Callable, Iterable, List
from pathlib import Path
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, ec, ed25519, padding
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
import ipaddress
import datetime
import subprocess
ROOT_PRIVATE_KEY: rsa.RSAPrivateKey = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
ROOT_PUBLIC_KEY: rsa.RSAPublicKey = ROOT_PRIVATE_KEY.public_key()
NOT_BEFORE: datetime.datetime = datetime.datetime.utcfromtimestamp(0x1FEDF00D - 30)
NOT_AFTER: datetime.datetime = datetime.datetime.utcfromtimestamp(0x1FEDF00D + 30)
ANY_PRIV_KEY = Union[
ed25519.Ed25519PrivateKey | ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey
]
ANY_PUB_KEY = Union[
ed25519.Ed25519PublicKey | ec.EllipticCurvePublicKey | rsa.RSAPublicKey
]
SIGNER = Callable[
[Any, bytes], Any
] # Note: a bit loosey-goosey here but good enough for tests.
def trim_top(file_name: str) -> TextIO:
"""
Reads `file_name`, then writes lines up to a particular comment (the "top"
of the file) back to it and returns the file object for further writing.
"""
with open(file_name, "r") as f:
top = f.readlines()
top = top[: top.index("// DO NOT EDIT BELOW: generated by tests/generate.py\n") + 1]
output = open(file_name, "w")
for line in top:
output.write(line)
return output
def key_or_generate(key: Optional[ANY_PRIV_KEY] = None) -> ANY_PRIV_KEY:
return (
key
if key is not None
else rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
)
def write_der(path: str, content: bytes, force: bool) -> None:
# Avoid churn from regenerating existing on-disk resources unless force is enabled.
out_path = Path(path)
if out_path.exists() and not force:
return None
with out_path.open("wb") as f:
f.write(content)
def end_entity_cert(
*,
subject_name: x509.Name,
issuer_name: x509.Name,
issuer_key: Optional[ANY_PRIV_KEY] = None,
subject_key: Optional[ANY_PRIV_KEY] = None,
sans: Optional[Iterable[x509.GeneralName]] = None,
ekus: Optional[Iterable[x509.ObjectIdentifier]] = None,
serial: Optional[int] = None,
) -> x509.Certificate:
subject_priv_key = key_or_generate(subject_key)
subject_key_pub: ANY_PUB_KEY = subject_priv_key.public_key()
ee_builder: x509.CertificateBuilder = x509.CertificateBuilder()
ee_builder = ee_builder.subject_name(subject_name)
ee_builder = ee_builder.issuer_name(issuer_name)
ee_builder = ee_builder.not_valid_before(NOT_BEFORE)
ee_builder = ee_builder.not_valid_after(NOT_AFTER)
ee_builder = ee_builder.serial_number(
x509.random_serial_number() if serial is None else serial
)
ee_builder = ee_builder.public_key(subject_key_pub)
if sans:
ee_builder = ee_builder.add_extension(
x509.SubjectAlternativeName(sans), critical=False
)
if ekus:
ee_builder = ee_builder.add_extension(
x509.ExtendedKeyUsage(ekus), critical=False
)
ee_builder = ee_builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
)
return ee_builder.sign(
private_key=issuer_key if issuer_key is not None else ROOT_PRIVATE_KEY,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
def subject_name_for_test(subject_cn: str, test_name: str) -> x509.Name:
return x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, subject_cn),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name),
]
)
def issuer_name_for_test(test_name: str) -> x509.Name:
return x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, "issuer.example.com"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name),
]
)
def ca_cert(
*,
subject_name: x509.Name,
subject_key: Optional[ANY_PRIV_KEY] = None,
issuer_name: Optional[x509.Name] = None,
issuer_key: Optional[ANY_PRIV_KEY] = None,
permitted_subtrees: Optional[Iterable[x509.GeneralName]] = None,
excluded_subtrees: Optional[Iterable[x509.GeneralName]] = None,
key_usage: Optional[x509.KeyUsage] = None,
) -> x509.Certificate:
subject_priv_key = key_or_generate(subject_key)
subject_key_pub: ANY_PUB_KEY = subject_priv_key.public_key()
ca_builder: x509.CertificateBuilder = x509.CertificateBuilder()
ca_builder = ca_builder.subject_name(subject_name)
ca_builder = ca_builder.issuer_name(issuer_name if issuer_name else subject_name)
ca_builder = ca_builder.not_valid_before(NOT_BEFORE)
ca_builder = ca_builder.not_valid_after(NOT_AFTER)
ca_builder = ca_builder.serial_number(x509.random_serial_number())
ca_builder = ca_builder.public_key(subject_key_pub)
ca_builder = ca_builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
if permitted_subtrees is not None or excluded_subtrees is not None:
ca_builder = ca_builder.add_extension(
x509.NameConstraints(permitted_subtrees, excluded_subtrees), critical=True
)
if key_usage is not None:
ca_builder = ca_builder.add_extension(
key_usage,
critical=True,
)
return ca_builder.sign(
private_key=issuer_key if issuer_key else subject_priv_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
def generate_tls_server_cert_test(
output: TextIO,
test_name: str,
expected_error: Optional[str] = None,
subject_common_name: Optional[str] = None,
extra_subject_names: Optional[List[x509.NameAttribute]] = None,
valid_names: Optional[List[str]] = None,
invalid_names: Optional[List[str]] = None,
sans: Optional[Iterable[x509.GeneralName]] = None,
permitted_subtrees: Optional[Iterable[x509.GeneralName]] = None,
excluded_subtrees: Optional[Iterable[x509.GeneralName]] = None,
force: bool = False,
) -> None:
"""
Generate a test case, writing a rust '#[test]' function into
tls_server_certs.rs, and writing supporting files into the current
directory.
- `test_name`: name of the test, must be a rust identifier.
- `expected_error`: item in `webpki::Error` enum, expected error from
webpki `verify_is_valid_tls_server_cert` function. Leave absent to
expect success.
- `subject_common_name`: optional string to put in end-entity certificate
subject common name.
- `extra_subject_names`: optional sequence of `x509.NameAttributes` to add
to end-entity certificate subject.
- `valid_names`: optional sequence of valid names that the end-entity
certificate is expected to pass `verify_is_valid_for_subject_name` for.
- `invalid_names`: optional sequence of invalid names that the end-entity
certificate is expected to fail `verify_is_valid_for_subject_name` with
`CertNotValidForName`.
- `sans`: optional sequence of `x509.GeneralName`s that are the contents of
the subjectAltNames extension. If empty or not provided the end-entity
certificate does not have a subjectAltName extension.
- `permitted_subtrees`: optional sequence of `x509.GeneralName`s that are
the `permittedSubtrees` contents of the `nameConstraints` extension.
If this and `excluded_subtrees` are empty/absent then the end-entity
certificate does not have a `nameConstraints` extension.
- `excluded_subtrees`: optional sequence of `x509.GeneralName`s that are
the `excludedSubtrees` contents of the `nameConstraints` extension.
If this and `permitted_subtrees` are both empty/absent then the
end-entity certificate does not have a `nameConstraints` extension.
"""
if invalid_names is None:
invalid_names = []
if valid_names is None:
valid_names = []
if extra_subject_names is None:
extra_subject_names = []
issuer_name: x509.Name = issuer_name_for_test(test_name)
# end-entity
ee_subject = x509.Name(
(
[x509.NameAttribute(NameOID.COMMON_NAME, subject_common_name)]
if subject_common_name
else []
)
+ [x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name)]
+ extra_subject_names
)
ee_certificate: x509.Certificate = end_entity_cert(
subject_name=ee_subject,
issuer_name=issuer_name,
sans=sans,
)
output_dir: str = "tls_server_certs"
ee_cert_path: str = os.path.join(output_dir, f"{test_name}.ee.der")
ca_cert_path: str = os.path.join(output_dir, f"{test_name}.ca.der")
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
write_der(ee_cert_path, ee_certificate.public_bytes(Encoding.DER), force)
# issuer
ca: x509.Certificate = ca_cert(
subject_name=issuer_name,
subject_key=ROOT_PRIVATE_KEY,
permitted_subtrees=permitted_subtrees,
excluded_subtrees=excluded_subtrees,
)
write_der(ca_cert_path, ca.public_bytes(Encoding.DER), force)
expected: str = ""
if expected_error is None:
expected = "Ok(())"
else:
expected = "Err(webpki::Error::" + expected_error + ")"
valid_names_str: str = ", ".join('"' + name + '"' for name in valid_names)
invalid_names_str: str = ", ".join('"' + name + '"' for name in invalid_names)
print(
"""
#[test]
fn %(test_name)s() {
let ee = include_bytes!("%(ee_cert_path)s");
let ca = include_bytes!("%(ca_cert_path)s");
assert_eq!(
check_cert(ee, ca, &[%(valid_names_str)s], &[%(invalid_names_str)s]),
%(expected)s
);
}"""
% locals(),
file=output,
)
def tls_server_certs(force: bool) -> None:
with trim_top("tls_server_certs.rs") as output:
generate_tls_server_cert_test(
output,
"no_name_constraints",
subject_common_name="subject.example.com",
valid_names=["dns.example.com"],
invalid_names=["subject.example.com"],
sans=[x509.DNSName("dns.example.com")],
)
generate_tls_server_cert_test(
output,
"additional_dns_labels",
subject_common_name="subject.example.com",
valid_names=["host1.example.com", "host2.example.com"],
invalid_names=["subject.example.com"],
sans=[x509.DNSName("host1.example.com"), x509.DNSName("host2.example.com")],
permitted_subtrees=[x509.DNSName(".example.com")],
)
generate_tls_server_cert_test(
output,
"disallow_dns_san",
expected_error="NameConstraintViolation",
sans=[x509.DNSName("disallowed.example.com")],
excluded_subtrees=[x509.DNSName("disallowed.example.com")],
)
generate_tls_server_cert_test(
output,
"allow_subject_common_name",
subject_common_name="allowed.example.com",
invalid_names=["allowed.example.com"],
permitted_subtrees=[x509.DNSName("allowed.example.com")],
)
generate_tls_server_cert_test(
output,
"allow_dns_san",
valid_names=["allowed.example.com"],
sans=[x509.DNSName("allowed.example.com")],
permitted_subtrees=[x509.DNSName("allowed.example.com")],
)
generate_tls_server_cert_test(
output,
"allow_dns_san_and_subject_common_name",
valid_names=["allowed-san.example.com"],
invalid_names=["allowed-cn.example.com"],
sans=[x509.DNSName("allowed-san.example.com")],
subject_common_name="allowed-cn.example.com",
permitted_subtrees=[
x509.DNSName("allowed-san.example.com"),
x509.DNSName("allowed-cn.example.com"),
],
)
generate_tls_server_cert_test(
output,
"disallow_dns_san_and_allow_subject_common_name",
expected_error="NameConstraintViolation",
sans=[
x509.DNSName("allowed-san.example.com"),
x509.DNSName("disallowed-san.example.com"),
],
subject_common_name="allowed-cn.example.com",
permitted_subtrees=[
x509.DNSName("allowed-san.example.com"),
x509.DNSName("allowed-cn.example.com"),
],
excluded_subtrees=[x509.DNSName("disallowed-san.example.com")],
)
# XXX: ideally this test case would be a negative one, because the name constraints
# should apply to the subject name.
# however, because we don't look at email addresses in subjects, it is accepted.
generate_tls_server_cert_test(
output,
"we_incorrectly_ignore_name_constraints_on_name_in_subject",
extra_subject_names=[
x509.NameAttribute(NameOID.EMAIL_ADDRESS, "joe@notexample.com")
],
permitted_subtrees=[x509.RFC822Name("example.com")],
)
# this does work, however, because we process all SANs
generate_tls_server_cert_test(
output,
"reject_constraints_on_unimplemented_names",
expected_error="NameConstraintViolation",
sans=[x509.RFC822Name("joe@example.com")],
permitted_subtrees=[x509.RFC822Name("example.com")],
)
# RFC5280 4.2.1.10:
# "If no name of the type is in the certificate,
# the certificate is acceptable."
generate_tls_server_cert_test(
output,
"we_ignore_constraints_on_names_that_do_not_appear_in_cert",
sans=[x509.DNSName("notexample.com")],
valid_names=["notexample.com"],
invalid_names=["example.com"],
permitted_subtrees=[x509.RFC822Name("example.com")],
)
generate_tls_server_cert_test(
output,
"wildcard_san_accepted_if_in_subtree",
sans=[x509.DNSName("*.example.com")],
valid_names=["bob.example.com", "jane.example.com"],
invalid_names=["example.com", "uh.oh.example.com"],
permitted_subtrees=[x509.DNSName("example.com")],
)
generate_tls_server_cert_test(
output,
"wildcard_san_rejected_if_in_excluded_subtree",
expected_error="NameConstraintViolation",
sans=[x509.DNSName("*.example.com")],
excluded_subtrees=[x509.DNSName("example.com")],
)
generate_tls_server_cert_test(
output,
"ip4_address_san_rejected_if_in_excluded_subtree",
expected_error="NameConstraintViolation",
sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))],
excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.0/24"))],
)
generate_tls_server_cert_test(
output,
"ip4_address_san_allowed_if_outside_excluded_subtree",
valid_names=["12.34.56.78"],
sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))],
excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.252/30"))],
)
sparse_net_addr = ipaddress.ip_network("12.34.56.78/24", strict=False)
sparse_net_addr.netmask = ipaddress.ip_address("255.255.255.1")
generate_tls_server_cert_test(
output,
"ip4_address_san_rejected_if_excluded_is_sparse_cidr_mask",
expected_error="InvalidNetworkMaskConstraint",
sans=[
# inside excluded network, if netmask is allowed to be sparse
x509.IPAddress(ipaddress.ip_address("12.34.56.79")),
],
excluded_subtrees=[x509.IPAddress(sparse_net_addr)],
)
generate_tls_server_cert_test(
output,
"ip4_address_san_allowed",
valid_names=["12.34.56.78"],
invalid_names=[
"12.34.56.77",
"12.34.56.79",
"0000:0000:0000:0000:0000:ffff:0c22:384e",
],
sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))],
permitted_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.0/24"))],
)
generate_tls_server_cert_test(
output,
"ip6_address_san_rejected_if_in_excluded_subtree",
expected_error="NameConstraintViolation",
sans=[x509.IPAddress(ipaddress.ip_address("2001:db8::1"))],
excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db8::/48"))],
)
generate_tls_server_cert_test(
output,
"ip6_address_san_allowed_if_outside_excluded_subtree",
valid_names=["2001:0db9:0000:0000:0000:0000:0000:0001"],
sans=[x509.IPAddress(ipaddress.ip_address("2001:db9::1"))],
excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db8::/48"))],
)
generate_tls_server_cert_test(
output,
"ip6_address_san_allowed",
valid_names=["2001:0db9:0000:0000:0000:0000:0000:0001"],
invalid_names=["12.34.56.78"],
sans=[x509.IPAddress(ipaddress.ip_address("2001:db9::1"))],
permitted_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db9::/48"))],
)
generate_tls_server_cert_test(
output,
"ip46_mixed_address_san_allowed",
valid_names=["12.34.56.78", "2001:0db9:0000:0000:0000:0000:0000:0001"],
invalid_names=[
"12.34.56.77",
"12.34.56.79",
"0000:0000:0000:0000:0000:ffff:0c22:384e",
],
sans=[
x509.IPAddress(ipaddress.ip_address("12.34.56.78")),
x509.IPAddress(ipaddress.ip_address("2001:db9::1")),
],
permitted_subtrees=[
x509.IPAddress(ipaddress.ip_network("12.34.56.0/24")),
x509.IPAddress(ipaddress.ip_network("2001:db9::/48")),
],
)
generate_tls_server_cert_test(
output,
"permit_directory_name_not_implemented",
expected_error="NameConstraintViolation",
permitted_subtrees=[
x509.DirectoryName(
x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "CN")])
)
],
)
generate_tls_server_cert_test(
output,
"exclude_directory_name_not_implemented",
expected_error="NameConstraintViolation",
excluded_subtrees=[
x509.DirectoryName(
x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "CN")])
)
],
)
generate_tls_server_cert_test(
output,
"invalid_dns_name_matching",
valid_names=["dns.example.com"],
subject_common_name="subject.example.com",
sans=[
x509.DNSName("{invalid}.example.com"),
x509.DNSName("dns.example.com"),
],
)
def signatures(force: bool) -> None:
rsa_pub_exponent: int = 0x10001
backend: Any = default_backend()
all_key_types: dict[str, ANY_PRIV_KEY] = {
"ed25519": ed25519.Ed25519PrivateKey.generate(),
"ecdsa_p256": ec.generate_private_key(ec.SECP256R1(), backend),
"ecdsa_p384": ec.generate_private_key(ec.SECP384R1(), backend),
"ecdsa_p521_not_supported": ec.generate_private_key(ec.SECP521R1(), backend),
"rsa_1024_not_supported": rsa.generate_private_key(
rsa_pub_exponent, 1024, backend
),
"rsa_2048": rsa.generate_private_key(rsa_pub_exponent, 2048, backend),
"rsa_3072": rsa.generate_private_key(rsa_pub_exponent, 3072, backend),
"rsa_4096": rsa.generate_private_key(rsa_pub_exponent, 4096, backend),
}
rsa_types: list[str] = [
"RSA_PKCS1_2048_8192_SHA256",
"RSA_PKCS1_2048_8192_SHA384",
"RSA_PKCS1_2048_8192_SHA512",
"RSA_PSS_2048_8192_SHA256_LEGACY_KEY",
"RSA_PSS_2048_8192_SHA384_LEGACY_KEY",
"RSA_PSS_2048_8192_SHA512_LEGACY_KEY",
]
webpki_algs: dict[str, Iterable[str]] = {
"ed25519": ["ED25519"],
"ecdsa_p256": ["ECDSA_P256_SHA384", "ECDSA_P256_SHA256"],
"ecdsa_p384": ["ECDSA_P384_SHA384", "ECDSA_P384_SHA256"],
"rsa_2048": rsa_types,
"rsa_3072": rsa_types + ["RSA_PKCS1_3072_8192_SHA384"],
"rsa_4096": rsa_types + ["RSA_PKCS1_3072_8192_SHA384"],
}
pss_sha256: padding.PSS = padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=32
)
pss_sha384: padding.PSS = padding.PSS(
mgf=padding.MGF1(hashes.SHA384()), salt_length=48
)
pss_sha512: padding.PSS = padding.PSS(
mgf=padding.MGF1(hashes.SHA512()), salt_length=64
)
how_to_sign: dict[str, SIGNER] = {
"ED25519": lambda key, message: key.sign(message),
"ECDSA_P256_SHA256": lambda key, message: key.sign(
message, ec.ECDSA(hashes.SHA256())
),
"ECDSA_P256_SHA384": lambda key, message: key.sign(
message, ec.ECDSA(hashes.SHA384())
),
"ECDSA_P384_SHA256": lambda key, message: key.sign(
message, ec.ECDSA(hashes.SHA256())
),
"ECDSA_P384_SHA384": lambda key, message: key.sign(
message, ec.ECDSA(hashes.SHA384())
),
"RSA_PKCS1_2048_8192_SHA256": lambda key, message: key.sign(
message, padding.PKCS1v15(), hashes.SHA256()
),
"RSA_PKCS1_2048_8192_SHA384": lambda key, message: key.sign(
message, padding.PKCS1v15(), hashes.SHA384()
),
"RSA_PKCS1_2048_8192_SHA512": lambda key, message: key.sign(
message, padding.PKCS1v15(), hashes.SHA512()
),
"RSA_PKCS1_3072_8192_SHA384": lambda key, message: key.sign(
message, padding.PKCS1v15(), hashes.SHA384()
),
"RSA_PSS_2048_8192_SHA256_LEGACY_KEY": lambda key, message: key.sign(
message, pss_sha256, hashes.SHA256()
),
"RSA_PSS_2048_8192_SHA384_LEGACY_KEY": lambda key, message: key.sign(
message, pss_sha384, hashes.SHA384()
),
"RSA_PSS_2048_8192_SHA512_LEGACY_KEY": lambda key, message: key.sign(
message, pss_sha512, hashes.SHA512()
),
}
output_dir: str = "signatures"
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
message = b"hello world!"
message_path: str = os.path.join(output_dir, "message.bin")
write_der(message_path, message, force)
def _cert_path(cert_type: str) -> str:
return os.path.join(output_dir, f"{cert_type}.ee.der")
for name, private_key in all_key_types.items():
ee_subject = x509.Name(
[x509.NameAttribute(NameOID.ORGANIZATION_NAME, name + " test")]
)
issuer_subject = x509.Name(
[x509.NameAttribute(NameOID.ORGANIZATION_NAME, name + " issuer")]
)
certificate: x509.Certificate = end_entity_cert(
subject_name=ee_subject,
subject_key=private_key,
issuer_name=issuer_subject,
)
write_der(_cert_path(name), certificate.public_bytes(Encoding.DER), force)
def _test(
test_name: str, cert_type: str, algorithm: str, signature: bytes, expected: str
) -> None:
nonlocal message_path
cert_path: str = _cert_path(cert_type)
lower_test_name: str = test_name.lower()
sig_path: str = os.path.join(output_dir, f"{lower_test_name}.sig.bin")
write_der(sig_path, signature, force)
print(
"""
#[test]
#[cfg(feature = "alloc")]
fn %(lower_test_name)s() {
let ee = include_bytes!("%(cert_path)s");
let message = include_bytes!("%(message_path)s");
let signature = include_bytes!("%(sig_path)s");
assert_eq!(
check_sig(ee, &webpki::%(algorithm)s, message, signature),
%(expected)s
);
}"""
% locals(),
file=output,
)
def good_signature(
test_name: str, cert_type: str, algorithm: str, signer: SIGNER
) -> None:
signature: bytes = signer(all_key_types[cert_type], message)
_test(test_name, cert_type, algorithm, signature, expected="Ok(())")
def good_signature_but_rejected(
test_name: str, cert_type: str, algorithm: str, signer: SIGNER
) -> None:
signature: bytes = signer(all_key_types[cert_type], message)
_test(
test_name,
cert_type,
algorithm,
signature,
expected="Err(webpki::Error::InvalidSignatureForPublicKey)",
)
def bad_signature(
test_name: str, cert_type: str, algorithm: str, signer: SIGNER
) -> None:
signature: bytes = signer(all_key_types[cert_type], message + b"?")
_test(
test_name,
cert_type,
algorithm,
signature,
expected="Err(webpki::Error::InvalidSignatureForPublicKey)",
)
def bad_algorithms_for_key(
test_name: str, cert_type: str, unusable_algs: set[str]
) -> None:
cert_path: str = _cert_path(cert_type)
test_name_lower: str = test_name.lower()
unusable_algs_str: str = ", ".join(
"&webpki::" + alg for alg in sorted(unusable_algs)
)
print(
"""
#[test]
#[cfg(feature = "alloc")]
fn %(test_name_lower)s() {
let ee = include_bytes!("%(cert_path)s");
for algorithm in &[ %(unusable_algs_str)s ] {
assert_eq!(
check_sig(ee, algorithm, b"", b""),
Err(webpki::Error::UnsupportedSignatureAlgorithmForPublicKey)
);
}
}"""
% locals(),
file=output,
)
with trim_top("signatures.rs") as output:
# compute all webpki algorithms covered by these tests
all_webpki_algs: set[str] = set(
[item for algs in webpki_algs.values() for item in algs]
)
for type, algs in webpki_algs.items():
for alg in algs:
signer: SIGNER = how_to_sign[alg]
good_signature(
type + "_key_and_" + alg + "_good_signature",
cert_type=type,
algorithm=alg,
signer=signer,
)
bad_signature(
type + "_key_and_" + alg + "_detects_bad_signature",
cert_type=type,
algorithm=alg,
signer=signer,
)
unusable_algs = set(all_webpki_algs)
for alg in algs:
unusable_algs.remove(alg)
# special case: tested separately below
if type == "rsa_2048":
unusable_algs.remove("RSA_PKCS1_3072_8192_SHA384")
bad_algorithms_for_key(
type + "_key_rejected_by_other_algorithms",
cert_type=type,
unusable_algs=unusable_algs,
)
good_signature_but_rejected(
"rsa_2048_key_rejected_by_RSA_PKCS1_3072_8192_SHA384",
cert_type="rsa_2048",
algorithm="RSA_PKCS1_3072_8192_SHA384",
signer=signer,
)
def generate_client_auth_test(
output: TextIO,
test_name: str,
ekus: Optional[Iterable[x509.ObjectIdentifier]],
expected_error: Optional[str] = None,
force: bool = False,
) -> None:
issuer_name: x509.Name = issuer_name_for_test(test_name)
# end-entity
ee_subject: x509.Name = x509.Name(
[x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name)]
)
ee_certificate: x509.Certificate = end_entity_cert(
subject_name=ee_subject,
ekus=ekus,
issuer_name=issuer_name,
)
output_dir: str = "client_auth"
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
ee_cert_path: str = os.path.join(output_dir, f"{test_name}.ee.der")
write_der(ee_cert_path, ee_certificate.public_bytes(Encoding.DER), force)
# issuer
ca: x509.Certificate = ca_cert(
subject_name=issuer_name, subject_key=ROOT_PRIVATE_KEY
)
ca_cert_path: str = os.path.join(output_dir, f"{test_name}.ca.der")
write_der(ca_cert_path, ca.public_bytes(Encoding.DER), force)
expected: str = ""
if expected_error is None:
expected = "Ok(())"
else:
expected = "Err(webpki::Error::" + expected_error + ")"
print(
"""
#[test]
#[cfg(feature = "alloc")]
fn %(test_name)s() {
let ee = include_bytes!("%(ee_cert_path)s");
let ca = include_bytes!("%(ca_cert_path)s");
assert_eq!(
check_cert(ee, ca),
%(expected)s
);
}"""
% locals(),
file=output,
)
def client_auth(force: bool) -> None:
with trim_top("client_auth.rs") as output:
generate_client_auth_test(
output, "cert_with_no_eku_accepted_for_client_auth", ekus=None
)
generate_client_auth_test(
output,
"cert_with_clientauth_eku_accepted_for_client_auth",
ekus=[ExtendedKeyUsageOID.CLIENT_AUTH],
)
generate_client_auth_test(
output,
"cert_with_both_ekus_accepted_for_client_auth",
ekus=[ExtendedKeyUsageOID.CLIENT_AUTH, ExtendedKeyUsageOID.SERVER_AUTH],
)
generate_client_auth_test(
output,
"cert_with_serverauth_eku_rejected_for_client_auth",
ekus=[ExtendedKeyUsageOID.SERVER_AUTH],
expected_error="RequiredEkuNotFound",
)
def client_auth_revocation(force: bool) -> None:
output_dir: str = "client_auth_revocation"
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
# KeyUsage for a CA that sets crl_sign.
crl_sign_ku = x509.KeyUsage(
digital_signature=True,
key_cert_sign=True,
crl_sign=True, # NB: crl_sign set.
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False,
)
# KeyUsage for a CA that omits crl_sign.
no_crl_sign_ku = x509.KeyUsage(
digital_signature=True,
key_cert_sign=True,
crl_sign=False, # NB: no crl_sign.
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False,
)
def _chain(
*, chain_name: str, key_usage: Optional[x509.KeyUsage]
) -> list[tuple[x509.Certificate, str, ANY_PRIV_KEY]]:
"""
Generate a short test certificate chain:
ee -> intermediate -> root.
:param chain_name: the name of the certificate chain. Used to generate subject/issuer names and to
choose the filename to write DER contents to disk.
:param key_usage: the KeyUsage to include in the issuer certificates (both the intermediate and the root).
:return: Return a list comprising the chain starting from the end entity and ending at the root trust anchor.
Each entry in the list is a tuple of three values: the x509.Certificate object, the path the DER encoding
was written to, and lastly the corresponding private key.
"""
ee_subj: x509.Name = subject_name_for_test("test.example.com", chain_name)
int_a_subj: x509.Name = issuer_name_for_test(f"int.a.{chain_name}")
int_b_subj: x509.Name = issuer_name_for_test(f"int.b.{chain_name}")
ca_subj: x509.Name = issuer_name_for_test(f"ca.{chain_name}")
ee_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
int_a_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
int_b_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
root_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
# EE cert issued by intermediate.
ee_cert: x509.Certificate = end_entity_cert(
subject_name=ee_subj, issuer_name=int_a_subj, issuer_key=int_a_key
)
ee_cert_path: str = os.path.join(output_dir, f"{chain_name}.ee.der")
write_der(ee_cert_path, ee_cert.public_bytes(Encoding.DER), force)
# Second EE cert issued by intermediate, with top-bit set in serial.
ee_cert_topbit: x509.Certificate = end_entity_cert(
subject_name=ee_subj,
issuer_name=int_a_subj,
issuer_key=int_a_key,
serial=0x80DEADBEEFF00D,
)
ee_cert_topbit_path: str = os.path.join(
output_dir, f"{chain_name}.topbit.ee.der"
)
write_der(ee_cert_topbit_path, ee_cert_topbit.public_bytes(Encoding.DER), force)
# intermediate a cert issued by intermediate b.
int_a_cert: x509.Certificate = ca_cert(
subject_name=int_a_subj,
subject_key=int_a_key,
issuer_name=int_b_subj,
issuer_key=int_b_key,
key_usage=key_usage,
)
int_a_cert_path: str = os.path.join(output_dir, f"{chain_name}.int.a.ca.der")
write_der(int_a_cert_path, int_a_cert.public_bytes(Encoding.DER), force)
# intermediate b cert issued by root cert.
int_b_cert: x509.Certificate = ca_cert(
subject_name=int_b_subj,
subject_key=int_b_key,
issuer_name=ca_subj,
issuer_key=root_key,
key_usage=key_usage,
)
int_b_cert_path: str = os.path.join(output_dir, f"{chain_name}.int.b.ca.der")
write_der(int_b_cert_path, int_b_cert.public_bytes(Encoding.DER), force)
# root cert issued by itself.
root_cert: x509.Certificate = ca_cert(
subject_name=ca_subj,
subject_key=root_key,
key_usage=key_usage,
)
root_cert_path: str = os.path.join(output_dir, f"{chain_name}.root.ca.der")
write_der(root_cert_path, root_cert.public_bytes(Encoding.DER), force)
return [
(ee_cert, ee_cert_path, ee_key),
(int_a_cert, int_a_cert_path, int_a_key),
(int_b_cert, int_b_cert_path, int_b_key),
(root_cert, root_cert_path, root_key),
(ee_cert_topbit, ee_cert_topbit_path, ee_key),
]
def _crl(
*,
serials: Iterable[int],
issuer_name: x509.Name,
issuer_key: Optional[ANY_PRIV_KEY],
) -> x509.CertificateRevocationList:
"""
Generate a certificate revocation list.
:param serials: list of serial numbers to include in the CRL as revoked certificates.
:param issuer_name: the name of the CRL issuer.
:param issuer_key: the key used to sign the CRL.
:return: a generated x509.CertificateRevocationList.
"""
issuer_priv_key: ANY_PRIV_KEY = key_or_generate(issuer_key)
crl_builder: x509.CertificateRevocationListBuilder = (
x509.CertificateRevocationListBuilder()
)
crl_builder = crl_builder.issuer_name(issuer_name)
crl_builder = crl_builder.last_update(NOT_BEFORE)
crl_builder = crl_builder.next_update(NOT_AFTER)
for serial in serials:
revoked_cert_builder: x509.RevokedCertificateBuilder = (
x509.RevokedCertificateBuilder()
)
revoked_cert_builder = revoked_cert_builder.serial_number(serial)
revoked_cert_builder = revoked_cert_builder.revocation_date(NOT_BEFORE)
revoked_cert_builder = revoked_cert_builder.add_extension(
x509.CRLReason(x509.ReasonFlags.key_compromise), critical=False
)
crl_builder = crl_builder.add_revoked_certificate(
revoked_cert_builder.build()
)
crl_builder = crl_builder.add_extension(
x509.CRLNumber(x509.random_serial_number()), critical=False
)
return crl_builder.sign(
private_key=issuer_priv_key,
algorithm=hashes.SHA256(),
)
def _revocation_test(
*,
test_name: str,
chain: list[tuple[x509.Certificate, str, ANY_PRIV_KEY]],
crl_paths: Iterable[str],
owned: bool,
expected_error: Optional[str],
ee_topbit_serial: bool = False,
) -> None:
"""
Generate a Rust unit test for a revocation checking scenario and write it to the output file.
:param test_name: the name of the unit test.
:param chain: the certificate chain to use for validation.
:param crl_paths: paths to zero or more CRLs.
:param owned: whether to use the owned or borrowed CRL representation.
:param expected_error: an optional error to expect to be returned from validation.
"""
if len(chain) != 5:
raise RuntimeError("invalid chain length")
ee_cert, ee_cert_path, _ = chain[4] if ee_topbit_serial else chain[0]
int_a_cert, int_a_cert_path, _ = chain[1]
int_b_cert, int_b_cert_path, _ = chain[2]
root_cert, root_cert_path, _ = chain[3]
int_a_str = f'include_bytes!("{int_a_cert_path}").as_slice()'
int_b_str = f'include_bytes!("{int_b_cert_path}").as_slice()'
intermediates_str: str = f"&[{int_a_str}, {int_b_str}]"
owned_convert: str = ".to_owned().unwrap()" if owned else ""
crl_includes: str = "\n".join(
[
f"""
&webpki::BorrowedCertRevocationList::from_der(include_bytes!("{path}").as_slice())
.unwrap()
{owned_convert}
as &dyn webpki::CertRevocationList,
"""
for path in crl_paths
]
)
crls: str = f"&[{crl_includes}]"
expected: str = (
f"Err(webpki::Error::{expected_error})" if expected_error else "Ok(())"
)
feature_gate = '#[cfg(feature = "alloc")]' if owned else ""
print(
"""
%(feature_gate)s
#[test]
fn %(test_name)s() {
let ee = include_bytes!("%(ee_cert_path)s");
let intermediates = %(intermediates_str)s;
let ca = include_bytes!("%(root_cert_path)s");
let crls = %(crls)s;
assert_eq!(check_cert(ee, intermediates, ca, crls), %(expected)s);
}
"""
% locals(),
file=output,
)
# Build a simple certificate chain where the issuers don't have a key usage specified.
no_ku_chain = _chain(chain_name="no_ku_chain", key_usage=None)
# Build a simple certificate chain where the issuers have key usage specified, but don't include the
# cRLSign bit.
no_crl_ku_chain = _chain(chain_name="no_crl_ku_chain", key_usage=no_crl_sign_ku)
# Build a simple certificate chain where the issuers have key usage specified, and include the cRLSign bit.
crl_ku_chain = _chain(chain_name="ku_chain", key_usage=crl_sign_ku)
def _no_crls_test_ee_depth() -> None:
# Providing no CRLs means the EE cert should verify without err.
_revocation_test(
test_name="no_crls_test_ee_depth",
chain=no_ku_chain,
crl_paths=[],
owned=False,
expected_error=None,
)
_revocation_test(
test_name="no_crls_test_ee_depth_owned",
chain=no_ku_chain,
crl_paths=[],
owned=True,
expected_error=None,
)
def _no_relevant_crl_ee_depth() -> None:
test_name = "no_relevant_crl_ee_depth"
# Generate a CRL that includes the EE cert's serial, but that is issued by an unknown issuer.
ee_cert = no_ku_chain[0][0]
no_match_crl = _crl(
serials=[ee_cert.serial_number],
issuer_name=subject_name_for_test("whatev", test_name),
issuer_key=None,
)
no_match_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(no_match_crl_path, no_match_crl.public_bytes(Encoding.DER), force)
# Providing no relevant CRL means the EE cert should verify without err.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[no_match_crl_path],
owned=False,
expected_error=None,
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[no_match_crl_path],
owned=True,
expected_error=None,
)
def _ee_not_revoked_ee_depth() -> None:
test_name = "ee_not_revoked_ee_depth"
ee_cert = no_ku_chain[0][0]
int_a_key = no_ku_chain[1][2]
# Generate a CRL that doesn't include the EE cert's serial, but that is issued the same issuer.
ee_not_revoked_crl = _crl(
serials=[12345], # Some serial that isn't the ee_cert.serial.
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
ee_not_revoked_crl_path,
ee_not_revoked_crl.public_bytes(Encoding.DER),
force,
)
# Providing a CRL that's relevant and verifies, but that doesn't include the EE cert serial should verify
# without err.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[ee_not_revoked_crl_path],
owned=False,
expected_error=None,
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[ee_not_revoked_crl_path],
owned=True,
expected_error=None,
)
def _ee_revoked_badsig_ee_depth() -> None:
test_name = "ee_revoked_badsig_ee_depth"
ee_cert = no_ku_chain[0][0]
# Generate a CRL that includes the EE cert's serial, and that is issued the same issuer, but that
# has an invalid signature.
rand_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
ee_revoked_badsig = _crl(
serials=[ee_cert.serial_number],
issuer_name=ee_cert.issuer,
issuer_key=rand_key, # NB: Using a random key to sign, not int_a_key.
)
ee_revoked_badsig_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
ee_revoked_badsig_path, ee_revoked_badsig.public_bytes(Encoding.DER), force
)
# Providing a relevant CRL that includes the EE cert serial but does not verify should error.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[ee_revoked_badsig_path],
owned=False,
expected_error="InvalidCrlSignatureForPublicKey",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[ee_revoked_badsig_path],
owned=True,
expected_error="InvalidCrlSignatureForPublicKey",
)
def _ee_revoked_wrong_ku_ee_depth() -> None:
test_name = "ee_revoked_wrong_ku_ee_depth"
ee_cert = no_crl_ku_chain[0][0]
int_a_key = no_crl_ku_chain[1][2]
# Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (with a KU specified
# but no cRLSign bit)
ee_revoked_crl = _crl(
serials=[ee_cert.serial_number],
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force)
# Providing a relevant CRL that includes the EE cert serial but was issued by a CA that has a KU specified
# that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs.
_revocation_test(
test_name=test_name,
chain=no_crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=False,
expected_error="IssuerNotCrlSigner",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=True,
expected_error="IssuerNotCrlSigner",
)
def _ee_not_revoked_wrong_ku_ee_depth() -> None:
test_name = "ee_not_revoked_wrong_ku_ee_depth"
ee_cert = no_crl_ku_chain[0][0]
int_a_key = no_crl_ku_chain[1][2]
# Generate a CRL that doesn't include the EE cert's serial, but that is issued the same issuer.
ee_not_revoked_crl = _crl(
serials=[12345], # Some serial that isn't the ee_cert.serial.
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
ee_not_revoked_crl_path,
ee_not_revoked_crl.public_bytes(Encoding.DER),
force,
)
# Providing a relevant CRL that includes the EE cert serial but was issued by a CA that has a KU specified
# that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs.
_revocation_test(
test_name=test_name,
chain=no_crl_ku_chain,
crl_paths=[ee_not_revoked_crl_path],
owned=False,
expected_error="IssuerNotCrlSigner",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_crl_ku_chain,
crl_paths=[ee_not_revoked_crl_path],
owned=True,
expected_error="IssuerNotCrlSigner",
)
def _ee_revoked_no_ku_ee_depth() -> None:
test_name = "ee_revoked_no_ku_ee_depth"
ee_cert = no_ku_chain[0][0]
int_a_key = no_ku_chain[1][2]
# Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (without any KU
# specified).
ee_revoked_crl = _crl(
serials=[ee_cert.serial_number],
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force)
# Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert
# was revoked.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=False,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=True,
expected_error="CertRevoked",
)
def _ee_revoked_crl_ku_ee_depth() -> None:
test_name = "ee_revoked_crl_ku_ee_depth"
ee_cert = crl_ku_chain[0][0]
int_a_key = crl_ku_chain[1][2]
# Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (with a KU
# specified that includes cRLSign).
ee_revoked_crl = _crl(
serials=[ee_cert.serial_number],
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force)
# Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert
# was revoked.
_revocation_test(
test_name=test_name,
chain=crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=False,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=True,
expected_error="CertRevoked",
)
def _no_crls_test_chain_depth() -> None:
# Providing no CRLs means the chain should verify without err.
_revocation_test(
test_name="no_crls_test_chain_depth",
chain=no_ku_chain,
crl_paths=[],
owned=False,
expected_error=None,
)
def _no_relevant_crl_chain_depth() -> None:
test_name = "no_relevant_crl_chain_depth"
# Generate a CRL that includes the first intermediate cert's serial, but that is issued by an unknown issuer.
int_a_cert = no_ku_chain[1][0]
no_match_crl = _crl(
serials=[int_a_cert.serial_number],
issuer_name=subject_name_for_test("whatev", test_name),
issuer_key=None,
)
no_match_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(no_match_crl_path, no_match_crl.public_bytes(Encoding.DER), force)
# Providing no relevant CRL means the chain should verify without err.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[no_match_crl_path],
owned=False,
expected_error=None,
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[no_match_crl_path],
owned=True,
expected_error=None,
)
def _int_not_revoked_chain_depth() -> None:
test_name = "int_not_revoked_chain_depth"
int_a_cert = no_ku_chain[1][0]
int_b_key = no_ku_chain[2][2]
# Generate a CRL that doesn't include the intermediate A cert's serial, but that is issued the same issuer.
int_not_revoked_crl = _crl(
serials=[12345], # Some serial that isn't the int_a_cert.serial.
issuer_name=int_a_cert.issuer,
issuer_key=int_b_key,
)
int_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
int_not_revoked_crl_path,
int_not_revoked_crl.public_bytes(Encoding.DER),
force,
)
# Providing a CRL that's relevant and verifies, but that doesn't include the intermediate cert serial should
# verify the chain without err.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[int_not_revoked_crl_path],
owned=False,
expected_error=None,
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[int_not_revoked_crl_path],
owned=True,
expected_error=None,
)
def _int_revoked_badsig_chain_depth() -> None:
test_name = "int_revoked_badsig_chain_depth"
int_a_cert = no_ku_chain[1][0]
# Generate a CRL that includes the intermediate cert's serial, and that is issued the same issuer, but that
# has an invalid signature.
rand_key: ec.EllipticCurvePrivateKey = ec.generate_private_key(
ec.SECP256R1(), default_backend()
)
int_revoked_badsig = _crl(
serials=[int_a_cert.serial_number],
issuer_name=int_a_cert.issuer,
issuer_key=rand_key, # NB: Using a random key to sign, not CA cert's key.
)
int_revoked_badsig_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
int_revoked_badsig_path,
int_revoked_badsig.public_bytes(Encoding.DER),
force,
)
# Providing a relevant CRL that includes the EE cert serial but does not verify should error.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[int_revoked_badsig_path],
owned=False,
expected_error="InvalidCrlSignatureForPublicKey",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[int_revoked_badsig_path],
owned=True,
expected_error="InvalidCrlSignatureForPublicKey",
)
def _int_revoked_wrong_ku_chain_depth() -> None:
test_name = "int_revoked_wrong_ku_chain_depth"
int_a_cert = no_crl_ku_chain[1][0]
int_b_key = no_crl_ku_chain[2][2]
# Generate a CRL that includes the intermediate A cert's serial, and that is issued by the same issuer (with a
# KU specified but no cRLSign bit)
int_revoked_crl = _crl(
serials=[int_a_cert.serial_number],
issuer_name=int_a_cert.issuer,
issuer_key=int_b_key,
)
int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force
)
# Providing a relevant CRL that includes the intermediate cert serial but was issued by a CA that has a KU
# specified that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs.
_revocation_test(
test_name=test_name,
chain=no_crl_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=False,
expected_error="IssuerNotCrlSigner",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_crl_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=True,
expected_error="IssuerNotCrlSigner",
)
def _ee_revoked_chain_depth() -> None:
test_name = "ee_revoked_chain_depth"
ee_cert = no_ku_chain[0][0]
int_a_key = no_ku_chain[1][2]
# Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (without any KU
# specified).
ee_revoked_crl = _crl(
serials=[ee_cert.serial_number],
issuer_name=ee_cert.issuer,
issuer_key=int_a_key,
)
ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force)
# Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert
# was revoked when using the Chain revocation check depth (since it implies EndEntity).
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=False,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=True,
expected_error="CertRevoked",
)
def _int_revoked_no_ku_chain_depth() -> None:
test_name = "int_revoked_no_ku_chain_depth"
int_a_cert = no_ku_chain[1][0]
int_b_key = no_ku_chain[2][2]
# Generate a CRL that includes the intermediate cert's serial, and that is issued by the same issuer
# (without any KU specified).
int_revoked_crl = _crl(
serials=[int_a_cert.serial_number],
issuer_name=int_a_cert.issuer,
issuer_key=int_b_key,
)
int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force
)
# Providing a relevant CRL that includes the intermediate cert serial and verifies, and using the
# Chain depth should error since an intermediate cert is revoked.
_revocation_test(
test_name=test_name,
chain=no_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=False,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=no_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=True,
expected_error="CertRevoked",
)
def _int_revoked_crl_ku_chain_depth() -> None:
test_name = "int_revoked_crl_ku_chain_depth"
int_a_cert = crl_ku_chain[1][0]
int_b_key = crl_ku_chain[2][2]
# Generate a CRL that includes the intermediate cert's serial, and that is issued by the same issuer (with a KU
# specified that includes cRLSign).
int_revoked_crl = _crl(
serials=[int_a_cert.serial_number],
issuer_name=int_a_cert.issuer,
issuer_key=int_b_key,
)
int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(
int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force
)
# Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert
# was revoked.
_revocation_test(
test_name=test_name,
chain=crl_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=False,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=crl_ku_chain,
crl_paths=[int_revoked_crl_path],
owned=True,
expected_error="CertRevoked",
)
def _ee_with_top_bit_set_serial_revoked() -> None:
test_name = "ee_with_top_bit_set_serial_revoked"
ee_cert_topbit = crl_ku_chain[4][0]
int_a_key = crl_ku_chain[1][2]
ee_revoked_crl = _crl(
serials=[ee_cert_topbit.serial_number],
issuer_name=ee_cert_topbit.issuer,
issuer_key=int_a_key,
)
ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der")
write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force)
_revocation_test(
test_name=test_name,
chain=crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=False,
ee_topbit_serial=True,
expected_error="CertRevoked",
)
_revocation_test(
test_name=test_name + "_owned",
chain=crl_ku_chain,
crl_paths=[ee_revoked_crl_path],
owned=True,
ee_topbit_serial=True,
expected_error="CertRevoked",
)
with trim_top("client_auth_revocation.rs") as output:
_no_crls_test_ee_depth()
_no_relevant_crl_ee_depth()
_ee_not_revoked_ee_depth()
_ee_revoked_badsig_ee_depth()
_ee_revoked_wrong_ku_ee_depth()
_ee_not_revoked_wrong_ku_ee_depth()
_ee_revoked_no_ku_ee_depth()
_ee_revoked_crl_ku_ee_depth()
_no_crls_test_chain_depth()
_no_relevant_crl_chain_depth()
_int_not_revoked_chain_depth()
_int_revoked_badsig_chain_depth()
_int_revoked_wrong_ku_chain_depth()
_ee_revoked_chain_depth()
_int_revoked_no_ku_chain_depth()
_int_revoked_crl_ku_chain_depth()
_ee_with_top_bit_set_serial_revoked()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tls-server-certs",
action=argparse.BooleanOptionalAction,
default=True,
help="Generate TLS server certificate testcases",
)
parser.add_argument(
"--signatures",
action=argparse.BooleanOptionalAction,
default=True,
help="Generate signature testcases",
)
parser.add_argument(
"--client-auth",
action=argparse.BooleanOptionalAction,
default=True,
help="Generate client auth testcases",
)
parser.add_argument(
"--client-auth-revocation",
action=argparse.BooleanOptionalAction,
default=True,
help="Generate client auth revocation testcases",
)
parser.add_argument(
"--format",
action=argparse.BooleanOptionalAction,
default=True,
help="Run cargo fmt post-generation",
)
parser.add_argument(
"--test",
action=argparse.BooleanOptionalAction,
default=True,
help="Run cargo test post-generation",
)
parser.add_argument(
"--force",
action=argparse.BooleanOptionalAction,
default=False,
help="Overwrite existing test keys/certs",
)
args = parser.parse_args()
if args.tls_server_certs:
tls_server_certs(args.force)
if args.signatures:
signatures(args.force)
if args.client_auth:
client_auth(args.force)
if args.client_auth_revocation:
client_auth_revocation(args.force)
if args.format:
subprocess.run("cargo fmt", shell=True, check=True)
if args.test:
subprocess.run("cargo test", shell=True, check=True)