1
0
mirror of https://github.com/samba-team/samba.git synced 2025-12-20 16:23:51 +03:00

tests/krb5: Add method for modifying a ticket and creating PAC checksums

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14642

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Joseph Sutton
2021-09-17 15:26:12 +12:00
committed by Andrew Bartlett
parent 12b5e72a35
commit 1fcde7cb6c

View File

@@ -3013,6 +3013,240 @@ class RawKerberosTest(TestCaseInTempDir):
ticket_ctype,
ticket_checksum)
def modified_ticket(self,
ticket, *,
new_ticket_key=None,
modify_fn=None,
modify_pac_fn=None,
exclude_pac=False,
update_pac_checksums=True,
checksum_keys=None,
include_checksums=None):
if checksum_keys is None:
# A dict containing a key for each checksum type to be created in
# the PAC.
checksum_keys = {}
if include_checksums is None:
# A dict containing a value for each checksum type; True if the
# checksum type is to be included in the PAC, False if it is to be
# excluded, or None/not present if the checksum is to be included
# based on its presence in the original PAC.
include_checksums = {}
# Check that the values passed in by the caller make sense.
self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
if exclude_pac:
self.assertIsNone(modify_pac_fn)
update_pac_checksums = False
if not update_pac_checksums:
self.assertFalse(checksum_keys)
self.assertFalse(include_checksums)
expect_pac = update_pac_checksums or modify_pac_fn is not None
key = ticket.decryption_key
if new_ticket_key is None:
# Use the same key to re-encrypt the ticket.
new_ticket_key = key
if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
# If the server signature key is not present, fall back to the key
# used to encrypt the ticket.
checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
# If the ticket signature key is not present, fall back to the key
# used for the KDC signature.
kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
if kdc_checksum_key is not None:
checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
kdc_checksum_key)
# Decrypt the ticket.
enc_part = ticket.ticket['enc-part']
self.assertElementEqual(enc_part, 'etype', key.etype)
self.assertElementKVNO(enc_part, 'kvno', key.kvno)
enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
enc_part = self.der_decode(
enc_part, asn1Spec=krb5_asn1.EncTicketPart())
# Modify the ticket here.
if modify_fn is not None:
enc_part = modify_fn(enc_part)
auth_data = enc_part.get('authorization-data')
if expect_pac:
self.assertIsNotNone(auth_data)
if auth_data is not None:
new_pac = None
if not exclude_pac:
# Get a copy of the authdata with an empty PAC, and the
# existing PAC (if present).
empty_pac = self.get_empty_pac()
empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
empty_pac)
if expect_pac:
self.assertIsNotNone(pac_data)
if pac_data is not None:
pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
# Modify the PAC here.
if modify_pac_fn is not None:
pac = modify_pac_fn(pac)
if update_pac_checksums:
# Get the enc-part with an empty PAC, which is needed
# to create a ticket signature.
enc_part_to_sign = enc_part.copy()
enc_part_to_sign['authorization-data'] = (
empty_pac_auth_data)
enc_part_to_sign = self.der_encode(
enc_part_to_sign,
asn1Spec=krb5_asn1.EncTicketPart())
self.update_pac_checksums(pac,
checksum_keys,
include_checksums,
enc_part_to_sign)
# Re-encode the PAC.
pac_data = ndr_pack(pac)
new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
pac_data)
# Replace the PAC in the authorization data and re-add it to the
# ticket enc-part.
auth_data, _ = self.replace_pac(auth_data, new_pac)
enc_part['authorization-data'] = auth_data
# Re-encrypt the ticket enc-part with the new key.
enc_part_new = self.der_encode(enc_part,
asn1Spec=krb5_asn1.EncTicketPart())
enc_part_new = self.EncryptedData_create(new_ticket_key,
KU_TICKET,
enc_part_new)
# Create a copy of the ticket with the new enc-part.
new_ticket = ticket.ticket.copy()
new_ticket['enc-part'] = enc_part_new
new_ticket_creds = KerberosTicketCreds(
new_ticket,
session_key=ticket.session_key,
crealm=ticket.crealm,
cname=ticket.cname,
srealm=ticket.srealm,
sname=ticket.sname,
decryption_key=new_ticket_key,
ticket_private=enc_part,
encpart_private=ticket.encpart_private)
return new_ticket_creds
def update_pac_checksums(self,
pac,
checksum_keys,
include_checksums,
enc_part=None):
pac_buffers = pac.buffers
checksum_buffers = {}
# Find the relevant PAC checksum buffers.
for pac_buffer in pac_buffers:
buffer_type = pac_buffer.type
if buffer_type in self.pac_checksum_types:
self.assertNotIn(buffer_type, checksum_buffers,
f'Duplicate checksum type {buffer_type}')
checksum_buffers[buffer_type] = pac_buffer
# Create any additional buffers that were requested but not
# present. Conversely, remove any buffers that were requested to be
# removed.
for buffer_type in self.pac_checksum_types:
if buffer_type in checksum_buffers:
if include_checksums.get(buffer_type) is False:
checksum_buffer = checksum_buffers.pop(buffer_type)
pac.num_buffers -= 1
pac_buffers.remove(checksum_buffer)
elif include_checksums.get(buffer_type) is True:
info = krb5pac.PAC_SIGNATURE_DATA()
checksum_buffer = krb5pac.PAC_BUFFER()
checksum_buffer.type = buffer_type
checksum_buffer.info = info
pac_buffers.append(checksum_buffer)
pac.num_buffers += 1
checksum_buffers[buffer_type] = checksum_buffer
# Fill the relevant checksum buffers.
for buffer_type, checksum_buffer in checksum_buffers.items():
checksum_key = checksum_keys[buffer_type]
ctype = checksum_key.ctype & ((1 << 32) - 1)
if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
self.assertIsNotNone(enc_part)
signature = checksum_key.make_checksum(
KU_NON_KERB_CKSUM_SALT,
enc_part)
elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
signature = Krb5EncryptionKey.make_zeroed_checksum(
checksum_key)
else:
signature = checksum_key.make_zeroed_checksum()
checksum_buffer.info.signature = signature
checksum_buffer.info.type = ctype
# Add the new checksum buffers to the PAC.
pac.buffers = pac_buffers
# Calculate the server and KDC checksums and insert them into the PAC.
server_checksum_buffer = checksum_buffers.get(
krb5pac.PAC_TYPE_SRV_CHECKSUM)
if server_checksum_buffer is not None:
server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
pac_data = ndr_pack(pac)
server_checksum = Krb5EncryptionKey.make_checksum(
server_checksum_key,
KU_NON_KERB_CKSUM_SALT,
pac_data)
server_checksum_buffer.info.signature = server_checksum
kdc_checksum_buffer = checksum_buffers.get(
krb5pac.PAC_TYPE_KDC_CHECKSUM)
if kdc_checksum_buffer is not None:
self.assertIsNotNone(server_checksum_buffer)
kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
kdc_checksum = kdc_checksum_key.make_checksum(
KU_NON_KERB_CKSUM_SALT,
server_checksum)
kdc_checksum_buffer.info.signature = kdc_checksum
def replace_pac(self, auth_data, new_pac, expect_pac=True):
if new_pac is not None:
self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)