1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-22 22:04:08 +03:00

tests/krb5: Add helper methods for PK-INIT testing

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Joseph Sutton 2023-07-03 14:49:43 +12:00 committed by Andrew Bartlett
parent 7f9547fda7
commit 7584e7a3a1

View File

@ -25,8 +25,14 @@ import random
import binascii
import itertools
import collections
import math
from enum import Enum
from pprint import pprint
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.backends import default_backend
from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
@ -34,6 +40,7 @@ from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
from pyasn1.codec.ber.encoder import BitStringEncoder
import pyasn1.type.univ
from pyasn1.error import PyAsn1Error
@ -1807,6 +1814,22 @@ class RawKerberosTest(TestCase):
Authenticator_obj['authorization-data'] = authorization_data
return Authenticator_obj
def PKAuthenticator_create(self,
cusec,
ctime,
nonce,
*,
pa_checksum=None):
pk_authenticator_obj = {
'cusec': cusec,
'ctime': ctime,
'nonce': nonce,
}
if pa_checksum is not None:
pk_authenticator_obj['paChecksum'] = pa_checksum
return pk_authenticator_obj
def TGS_REQ_create(self,
padata, # optional
cusec,
@ -2019,6 +2042,382 @@ class RawKerberosTest(TestCase):
return krb_priv
def ContentInfo_create(self, content_type, content):
content_info_obj = {
'contentType': content_type,
'content': content,
}
return content_info_obj
def EncapsulatedContentInfo_create(self, content_type, content):
encapsulated_content_info_obj = {
'eContentType': content_type,
'eContent': content,
}
return encapsulated_content_info_obj
def SignedData_create(self,
digest_algorithms,
encap_content_info,
signer_infos,
*,
version=None,
certificates=None,
crls=None):
def is_cert_version_present(version):
return certificates is not None and any(
version in cert for cert in certificates)
def is_crl_version_present(version):
return crls is not None and any(
version in crl for crl in crls)
def is_signer_info_version_present(version):
return signer_infos is not None and any(
signer_info['version'] == version
for signer_info in signer_infos)
def data_version():
# per RFC5652 5.1:
if is_cert_version_present('other') or (
is_crl_version_present('other')):
return 5
if is_cert_version_present('v2AttrCert'):
return 4
if is_cert_version_present('v1AttrCert') or (
is_signer_info_version_present(3)) or (
encap_content_info['eContentType'] != krb5_asn1.id_data
):
return 3
return 1
if version is None:
version = data_version()
signed_data_obj = {
'version': version,
'digestAlgorithms': digest_algorithms,
'encapContentInfo': encap_content_info,
'signerInfos': signer_infos,
}
if certificates is not None:
signed_data_obj['certificates'] = certificates
if crls is not None:
signed_data_obj['crls'] = crls
return signed_data_obj
def AuthPack_create(self,
pk_authenticator,
*,
client_public_value=None,
supported_cms_types=None,
client_dh_nonce=None):
auth_pack_obj = {
'pkAuthenticator': pk_authenticator,
}
if client_public_value is not None:
auth_pack_obj['clientPublicValue'] = client_public_value
if supported_cms_types is not None:
auth_pack_obj['supportedCMSTypes'] = supported_cms_types
if client_dh_nonce is not None:
auth_pack_obj['clientDHNonce'] = client_dh_nonce
return auth_pack_obj
def PK_AS_REQ_create(self,
signed_auth_pack,
*,
trusted_certifiers=None,
kdc_pk_id=None):
content_info_obj = self.ContentInfo_create(
krb5_asn1.id_signedData, signed_auth_pack)
content_info = self.der_encode(content_info_obj,
asn1Spec=krb5_asn1.ContentInfo())
pk_as_req_obj = {
'signedAuthPack': content_info,
}
if trusted_certifiers is not None:
pk_as_req_obj['trustedCertifiers'] = trusted_certifiers
if kdc_pk_id is not None:
pk_as_req_obj['kdcPkId'] = kdc_pk_id
return self.der_encode(pk_as_req_obj,
asn1Spec=krb5_asn1.PA_PK_AS_REQ())
def SignerInfo_create(self,
signer_id,
digest_algorithm,
signature_algorithm,
signature,
*,
version=None,
signed_attrs=None,
unsigned_attrs=None):
if version is None:
# per RFC5652 5.3:
if 'issuerAndSerialNumber' in signer_id:
version = 1
elif 'subjectKeyIdentifier' in signer_id:
version = 3
else:
self.fail(f'unknown signer ID version ({signer_id})')
signer_info_obj = {
'version': version,
'sid': signer_id,
'digestAlgorithm': digest_algorithm,
'signatureAlgorithm': signature_algorithm,
'signature': signature,
}
if signed_attrs is not None:
signer_info_obj['signedAttrs'] = signed_attrs
if unsigned_attrs is not None:
signer_info_obj['unsignedAttrs'] = unsigned_attrs
return signer_info_obj
def SignerIdentifier_create(self, *,
issuer_and_serial_number=None,
subject_key_id=None):
if issuer_and_serial_number is not None:
return {'issuerAndSerialNumber': issuer_and_serial_number}
if subject_key_id is not None:
return {'subjectKeyIdentifier': subject_key_id}
self.fail('identifier not specified')
def AlgorithmIdentifier_create(self,
algorithm,
*,
parameters=None):
algorithm_id_obj = {
'algorithm': algorithm,
}
if parameters is not None:
algorithm_id_obj['parameters'] = parameters
return algorithm_id_obj
def SubjectPublicKeyInfo_create(self,
algorithm,
public_key):
return {
'algorithm': algorithm,
'subjectPublicKey': public_key,
}
def ValidationParms_create(self,
seed,
pgen_counter):
return {
'seed': seed,
'pgenCounter': pgen_counter,
}
def DomainParameters_create(self,
p,
g,
*,
q=None,
j=None,
validation_parms=None):
domain_params_obj = {
'p': p,
'g': g,
}
if q is not None:
domain_params_obj['q'] = q
if j is not None:
domain_params_obj['j'] = j
if validation_parms is not None:
domain_params_obj['validationParms'] = validation_parms
return domain_params_obj
def length_in_bytes(self, value):
"""Return the length in bytes of an integer once it is encoded as
bytes."""
self.assertGreaterEqual(value, 0, 'value must be positive')
self.assertIsInstance(value, int)
length_in_bits = max(1, math.log2(value + 1))
length_in_bytes = math.ceil(length_in_bits / 8)
return length_in_bytes
def bytes_from_int(self, value, *, length=None):
"""Return an integer encoded big-endian into bytes of an optionally
specified length.
"""
if length is None:
length = self.length_in_bytes(value)
return value.to_bytes(length, 'big')
def int_from_bytes(self, data):
"""Return an integer decoded from bytes in big-endian format."""
return int.from_bytes(data, 'big')
def int_from_bit_string(self, string):
"""Return an integer decoded from a bitstring."""
return int(string, base=2)
def bit_string_from_int(self, value):
"""Return a bitstring encoding of an integer."""
string = f'{value:b}'
# The bitstring must be padded to a multiple of 8 bits in length, or
# pyasn1 will interpret it incorrectly (as if the padding bits were
# present, but on the wrong end).
length = len(string)
padding_len = math.ceil(length / 8) * 8 - length
return '0' * padding_len + string
def bit_string_from_bytes(self, data):
"""Return a bitstring encoding of bytes in big-endian format."""
value = self.int_from_bytes(data)
return self.bit_string_from_int(value)
def bytes_from_bit_string(self, string):
"""Return big-endian format bytes encoded from a bitstring."""
value = self.int_from_bit_string(string)
length = math.ceil(len(string) / 8)
return value.to_bytes(length, 'big')
def asn1_length(self, data):
"""Return the ASN.1 encoding of the length of some data."""
length = len(data)
self.assertGreater(length, 0)
if length < 0x80:
return bytes([length])
encoding_len = self.length_in_bytes(length)
self.assertLess(encoding_len, 0x80,
'item is too long to be ASN.1 encoded')
data = self.bytes_from_int(length, length=encoding_len)
return bytes([0x80 | encoding_len]) + data
@staticmethod
def octetstring2key(x, enctype):
"""This implements the function defined in RFC4556 3.2.3.1 “Using
Diffie-Hellman Key Exchange."""
seedsize = kcrypto.seedsize(enctype)
seed = b''
# A counter that cycles through the bytes 0x000xff.
counter = itertools.cycle(map(lambda x: bytes([x]),
range(256)))
while len(seed) < seedsize:
digest = hashes.Hash(hashes.SHA1(), default_backend())
digest.update(next(counter) + x)
seed += digest.finalize()
key = kcrypto.random_to_key(enctype, seed[:seedsize])
return RodcPacEncryptionKey(key, kvno=None)
def unpad(self, data):
"""Return unpadded data."""
padding_len = data[-1]
expected_padding = bytes([padding_len]) * padding_len
self.assertEqual(expected_padding, data[-padding_len:],
'invalid padding bytes')
return data[:-padding_len]
def try_decode(self, data, module=None):
"""Try to decode some data of unknown type with various known ASN.1
schemata (optionally restricted to those from a particular module) and
print any results that seem promising. For use when debugging.
"""
if module is None:
# Try a couple of known ASN.1 modules.
self.try_decode(data, krb5_asn1)
self.try_decode(data, pyasn1.type.univ)
# Its helpful to stop and give the user a chance to examine the
# results.
self.fail('decoding done')
names = dir(module)
for name in names:
item = getattr(module, name)
if not callable(item):
continue
try:
decoded = self.der_decode(data, asn1Spec=item())
except Exception:
# Initiating the schema or decoding the ASN.1 failed for
# whatever reason.
pass
else:
# Decoding succeeded: print the structure to be examined.
print(f'\t{name}')
pprint(decoded)
def cipher_from_algorithm(self, algorithm):
if algorithm == str(krb5_asn1.aes256_CBC_PAD):
return algorithms.AES
if algorithm == str(krb5_asn1.des_EDE3_CBC):
return algorithms.TripleDES
self.fail(f'unknown cipher algorithm {algorithm}')
def hash_from_algorithm(self, algorithm):
# Let someone pass in an ObjectIdentifier.
algorithm = str(algorithm)
if algorithm == str(krb5_asn1.id_sha1):
return hashes.SHA1
if algorithm == str(krb5_asn1.sha1WithRSAEncryption):
return hashes.SHA1
if algorithm == str(krb5_asn1.rsaEncryption):
return hashes.SHA1
if algorithm == str(krb5_asn1.id_pkcs1_sha256WithRSAEncryption):
return hashes.SHA256
if algorithm == str(krb5_asn1.id_sha512):
return hashes.SHA512
self.fail(f'unknown hash algorithm {algorithm}')
def hash_from_algorithm_id(self, algorithm_id):
self.assertIsInstance(algorithm_id, dict)
hash = self.hash_from_algorithm(algorithm_id['algorithm'])
parameters = algorithm_id.get('parameters')
if self.strict_checking:
self.assertIsNotNone(parameters)
if parameters is not None:
self.assertEqual(b'\x05\x00', parameters)
return hash
def kpasswd_create(self,
subkey,
user_data,
@ -4000,6 +4399,10 @@ class RawKerberosTest(TestCase):
return max(filter(lambda e: e in etypes, proposed_etypes),
default=None)
@staticmethod
def first_common_etype(etypes, proposed_etypes):
return next(filter(lambda e: e in etypes, proposed_etypes), None)
def supported_aes_rc4_etypes(self, kdc_exchange_dict):
creds = kdc_exchange_dict['creds']
supported_etypes = self.get_default_enctypes(creds)
@ -4029,6 +4432,16 @@ class RawKerberosTest(TestCase):
return expected_aes, expected_rc4
def expected_etype(self, kdc_exchange_dict):
req_body = kdc_exchange_dict['req_body']
proposed_etypes = req_body['etype']
aes_etypes, rc4_etypes = self.supported_aes_rc4_etypes(
kdc_exchange_dict)
return self.first_common_etype(aes_etypes | rc4_etypes,
proposed_etypes)
def check_rep_padata(self,
kdc_exchange_dict,
callback_dict,
@ -4951,6 +5364,11 @@ class RawKerberosTest(TestCase):
return PADATA_REQ_ENC_PA_REP in fast_pa_dict
def sent_pk_as_req(self, kdc_exchange_dict):
fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
return PADATA_PK_AS_REQ in fast_pa_dict
def get_sent_pac_options(self, kdc_exchange_dict):
fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)