1
0
mirror of https://github.com/samba-team/samba.git synced 2025-09-13 17:44:21 +03:00
Files
samba-mirror/python/samba/tests/krb5/s4u_tests.py
Rob van der Linde 6ac4833678 python: tests: update all super calls to python 3 style in tests
Signed-off-by: Rob van der Linde <rob@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>

[abartlet@samba.org Some python2 style super() calls remain due
 to being an actual, even if reasonable, behaviour change]
2023-11-30 01:05:32 +00:00

1839 lines
74 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Unix SMB/CIFS implementation.
# Copyright (C) Stefan Metzmacher 2020
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import os
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
import functools
import time
from samba import dsdb, ntstatus
from samba.dcerpc import krb5pac, lsa, security
from samba.tests import env_get_var_value
from samba.tests.krb5.kcrypto import Cksumtype, Enctype
from samba.tests.krb5.kdc_base_test import KDCBaseTest
from samba.tests.krb5.raw_testcase import (
RawKerberosTest,
RodcPacEncryptionKey,
ZeroedChecksumKey
)
from samba.tests.krb5.rfc4120_constants import (
AD_IF_RELEVANT,
AES256_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5,
KDC_ERR_BADMATCH,
KDC_ERR_BADOPTION,
KDC_ERR_BAD_INTEGRITY,
KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM,
KDC_ERR_MODIFIED,
KDC_ERR_SUMTYPE_NOSUPP,
KDC_ERR_TGT_REVOKED,
KU_AS_REP_ENC_PART,
KU_PA_ENC_TIMESTAMP,
KU_TGS_REP_ENC_PART_SUB_KEY,
KU_TGS_REQ_AUTH_DAT_SESSION,
KU_TGS_REQ_AUTH_DAT_SUBKEY,
NT_PRINCIPAL,
)
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
SidType = RawKerberosTest.SidType
global_asn1_print = False
global_hexdump = False
class S4UKerberosTests(KDCBaseTest):
default_attrs = security.SE_GROUP_DEFAULT_FLAGS
def setUp(self):
super().setUp()
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
def _test_s4u2self(self, pa_s4u2self_ctype=None):
service_creds = self.get_service_creds()
service = service_creds.get_username()
realm = service_creds.get_realm()
cname = self.PrincipalName_create(name_type=1, names=[service])
sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
till = self.get_KerberosTime(offset=36000)
kdc_options = krb5_asn1.KDCOptions('forwardable')
padata = None
etypes = (18, 17, 23)
req = self.AS_REQ_create(padata=padata,
kdc_options=str(kdc_options),
cname=cname,
realm=realm,
sname=sname,
from_time=None,
till_time=till,
renew_time=None,
nonce=0x7fffffff,
etypes=etypes,
addresses=None,
additional_tickets=None)
rep = self.send_recv_transaction(req)
self.assertIsNotNone(rep)
self.assertEqual(rep['msg-type'], 30)
self.assertEqual(rep['error-code'], 25)
rep_padata = self.der_decode(
rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
for pa in rep_padata:
if pa['padata-type'] == 19:
etype_info2 = pa['padata-value']
break
etype_info2 = self.der_decode(
etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
(patime, pausec) = self.get_KerberosTimeWithUsec()
pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
pa_ts = self.PA_DATA_create(2, pa_ts)
kdc_options = krb5_asn1.KDCOptions('forwardable')
padata = [pa_ts]
req = self.AS_REQ_create(padata=padata,
kdc_options=str(kdc_options),
cname=cname,
realm=realm,
sname=sname,
from_time=None,
till_time=till,
renew_time=None,
nonce=0x7fffffff,
etypes=etypes,
addresses=None,
additional_tickets=None)
rep = self.send_recv_transaction(req)
self.assertIsNotNone(rep)
msg_type = rep['msg-type']
self.assertEqual(msg_type, 11)
enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with
# application tag 26
try:
enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
except Exception:
enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
# S4U2Self Request
sname = cname
for_user_name = env_get_var_value('FOR_USER')
uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
kdc_options = krb5_asn1.KDCOptions('forwardable')
till = self.get_KerberosTime(offset=36000)
ticket = rep['ticket']
ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
tgt_session_key=ticket_session_key,
ctype=pa_s4u2self_ctype)
padata = [pa_s4u]
subkey = self.RandomKey(ticket_session_key.etype)
(ctime, cusec) = self.get_KerberosTimeWithUsec()
req = self.TGS_REQ_create(padata=padata,
cusec=cusec,
ctime=ctime,
ticket=ticket,
kdc_options=str(kdc_options),
cname=cname,
realm=realm,
sname=sname,
from_time=None,
till_time=till,
renew_time=None,
nonce=0x7ffffffe,
etypes=etypes,
addresses=None,
EncAuthorizationData=None,
EncAuthorizationData_key=None,
additional_tickets=None,
ticket_session_key=ticket_session_key,
authenticator_subkey=subkey)
rep = self.send_recv_transaction(req)
self.assertIsNotNone(rep)
msg_type = rep['msg-type']
if msg_type == 13:
enc_part2 = subkey.decrypt(
KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
enc_part2 = self.der_decode(
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
return msg_type
# Using the checksum type from the tgt_session_key happens to work
# everywhere
def test_s4u2self(self):
msg_type = self._test_s4u2self()
self.assertEqual(msg_type, 13)
# Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
def test_s4u2self_hmac_md5_checksum(self):
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
self.assertEqual(msg_type, 13)
def test_s4u2self_md5_unkeyed_checksum(self):
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
self.assertEqual(msg_type, 30)
def test_s4u2self_sha1_unkeyed_checksum(self):
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
self.assertEqual(msg_type, 30)
def test_s4u2self_crc32_unkeyed_checksum(self):
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
self.assertEqual(msg_type, 30)
def _run_s4u2self_test(self, kdc_dict):
client_opts = kdc_dict.pop('client_opts', None)
client_creds = self.get_cached_creds(
account_type=self.AccountType.USER,
opts=client_opts)
service_opts = kdc_dict.pop('service_opts', None)
service_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service_opts)
service_tgt = self.get_tgt(service_creds)
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
if modify_service_tgt_fn is not None:
service_tgt = modify_service_tgt_fn(service_tgt)
client_name = client_creds.get_username()
client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
names=[client_name])
service_name = kdc_dict.pop('service_name', None)
if service_name is None:
service_name = service_creds.get_username()[:-1]
service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
names=['host', service_name])
realm = client_creds.get_realm()
expected_flags = kdc_dict.pop('expected_flags', None)
if expected_flags is not None:
expected_flags = krb5_asn1.TicketFlags(expected_flags)
unexpected_flags = kdc_dict.pop('unexpected_flags', None)
if unexpected_flags is not None:
unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
expect_status = kdc_dict.pop('expect_status', None)
expected_status = kdc_dict.pop('expected_status', None)
if expected_error_mode:
check_error_fn = self.generic_check_kdc_error
check_rep_fn = None
else:
check_error_fn = None
check_rep_fn = self.generic_check_kdc_rep
self.assertIsNone(expect_status)
self.assertIsNone(expected_status)
kdc_options = kdc_dict.pop('kdc_options', '0')
kdc_options = krb5_asn1.KDCOptions(kdc_options)
service_decryption_key = self.TicketDecryptionKey_from_creds(
service_creds)
authenticator_subkey = self.RandomKey(Enctype.AES256)
etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5))
expect_edata = kdc_dict.pop('expect_edata', None)
expected_groups = kdc_dict.pop('expected_groups', None)
unexpected_groups = kdc_dict.pop('unexpected_groups', None)
def generate_s4u2self_padata(_kdc_exchange_dict,
_callback_dict,
req_body):
pa_s4u = self.PA_S4U2Self_create(
name=client_cname,
realm=realm,
tgt_session_key=service_tgt.session_key,
ctype=None)
return [pa_s4u], req_body
kdc_exchange_dict = self.tgs_exchange_dict(
expected_crealm=realm,
expected_cname=client_cname,
expected_srealm=realm,
expected_sname=service_sname,
expected_account_name=client_name,
expected_groups=expected_groups,
unexpected_groups=unexpected_groups,
expected_sid=client_creds.get_sid(),
expected_flags=expected_flags,
unexpected_flags=unexpected_flags,
ticket_decryption_key=service_decryption_key,
expect_ticket_checksum=True,
generate_padata_fn=generate_s4u2self_padata,
check_error_fn=check_error_fn,
check_rep_fn=check_rep_fn,
check_kdc_private_fn=self.generic_check_kdc_private,
expected_error_mode=expected_error_mode,
expect_status=expect_status,
expected_status=expected_status,
tgt=service_tgt,
authenticator_subkey=authenticator_subkey,
kdc_options=str(kdc_options),
expect_edata=expect_edata)
self._generic_kdc_exchange(kdc_exchange_dict,
cname=None,
realm=realm,
sname=service_sname,
etypes=etypes)
if not expected_error_mode:
# Check that the ticket contains a PAC.
ticket = kdc_exchange_dict['rep_ticket_creds']
pac = self.get_ticket_pac(ticket)
self.assertIsNotNone(pac)
# Ensure we used all the parameters given to us.
self.assertEqual({}, kdc_dict)
# Test performing an S4U2Self operation with a forwardable ticket. The
# resulting ticket should have the 'forwardable' flag set.
def test_s4u2self_forwardable(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'expected_flags': 'forwardable'
})
# Test performing an S4U2Self operation with a forwardable ticket that does
# not contain a PAC. The request should fail.
def test_s4u2self_no_pac(self):
def forwardable_no_pac(ticket):
ticket = self.set_ticket_forwardable(ticket, flag=True)
return self.remove_ticket_pac(ticket)
self._run_s4u2self_test(
{
'expected_error_mode': KDC_ERR_TGT_REVOKED,
'client_opts': {
'not_delegated': False
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': forwardable_no_pac,
'expected_flags': 'forwardable',
'expect_edata': False
})
# Test performing an S4U2Self operation without requesting a forwardable
# ticket. The resulting ticket should not have the 'forwardable' flag set.
def test_s4u2self_without_forwardable(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'unexpected_flags': 'forwardable'
})
# Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
# not be set on the ticket.
def test_s4u2self_not_forwardable(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=False),
'unexpected_flags': 'forwardable'
})
# Do an S4U2Self with the not_delegated flag set on the client. The
# 'forwardable' flag should not be set on the ticket.
def test_s4u2self_client_not_delegated(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': True
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'unexpected_flags': 'forwardable'
})
# Do an S4U2Self with a service not trusted to authenticate for delegation,
# but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
# flag should be set on the ticket.
def test_s4u2self_not_trusted_empty_allowed(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': False,
'delegation_to_spn': ()
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'expected_flags': 'forwardable'
})
# Do an S4U2Self with a service not trusted to authenticate for delegation
# and having a non-empty msDS-AllowedToDelegateTo attribute. The
# 'forwardable' flag should not be set on the ticket.
def test_s4u2self_not_trusted_nonempty_allowed(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': False,
'delegation_to_spn': ('test',)
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'unexpected_flags': 'forwardable'
})
# Do an S4U2Self with a service trusted to authenticate for delegation and
# having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
# flag should be set on the ticket.
def test_s4u2self_trusted_empty_allowed(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': True,
'delegation_to_spn': ()
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'expected_flags': 'forwardable'
})
# Do an S4U2Self with a service trusted to authenticate for delegation and
# having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
# flag should be set on the ticket.
def test_s4u2self_trusted_nonempty_allowed(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': True,
'delegation_to_spn': ('test',)
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'expected_flags': 'forwardable'
})
# Do an S4U2Self with the sname in the request different to that of the
# service. We expect an error.
def test_s4u2self_wrong_sname(self):
other_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts={
'trusted_to_auth_for_delegation': True,
'id': 0
})
other_sname = other_creds.get_username()[:-1]
self._run_s4u2self_test(
{
'expected_error_mode': KDC_ERR_BADMATCH,
'expect_edata': False,
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': True
},
'service_name': other_sname,
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True)
})
# Do an S4U2Self where the service does not require authorization data. The
# resulting ticket should still contain a PAC.
def test_s4u2self_no_auth_data_required(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'service_opts': {
'trusted_to_auth_for_delegation': True,
'no_auth_data_required': True
},
'kdc_options': 'forwardable',
'modify_service_tgt_fn': functools.partial(
self.set_ticket_forwardable, flag=True),
'expected_flags': 'forwardable'
})
# Do an S4U2Self and check that the service asserted identity is part of
# the sids.
def test_s4u2self_asserted_identity(self):
self._run_s4u2self_test(
{
'client_opts': {
'not_delegated': False
},
'expected_groups': {
(security.SID_SERVICE_ASSERTED_IDENTITY,
SidType.EXTRA_SID,
self.default_attrs),
...
},
'unexpected_groups': {
security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
},
})
def _run_delegation_test(self, kdc_dict):
s4u2self = kdc_dict.pop('s4u2self', False)
authtime_delay = kdc_dict.pop('authtime_delay', 0)
client_opts = kdc_dict.pop('client_opts', None)
client_creds = self.get_cached_creds(
account_type=self.AccountType.USER,
opts=client_opts)
sid = client_creds.get_sid()
service1_opts = kdc_dict.pop('service1_opts', {})
service2_opts = kdc_dict.pop('service2_opts', {})
allow_delegation = kdc_dict.pop('allow_delegation', False)
allow_rbcd = kdc_dict.pop('allow_rbcd', False)
self.assertFalse(allow_delegation and allow_rbcd)
if allow_rbcd:
service1_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service1_opts)
self.assertNotIn('delegation_from_dn', service2_opts)
service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
service2_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service2_opts)
else:
service2_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service2_opts)
if allow_delegation:
self.assertNotIn('delegation_to_spn', service1_opts)
service1_opts['delegation_to_spn'] = service2_creds.get_spn()
service1_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service1_opts)
service1_tgt = self.get_tgt(service1_creds)
self.assertElementPresent(service1_tgt.ticket_private, 'authtime')
service1_tgt_authtime = self.getElementValue(service1_tgt.ticket_private, 'authtime')
client_username = client_creds.get_username()
client_realm = client_creds.get_realm()
client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
names=[client_username])
service1_name = service1_creds.get_username()[:-1]
service1_realm = service1_creds.get_realm()
service1_service = 'host'
service1_sname = self.PrincipalName_create(
name_type=NT_PRINCIPAL, names=[service1_service,
service1_name])
service1_decryption_key = self.TicketDecryptionKey_from_creds(
service1_creds)
expect_pac = kdc_dict.pop('expect_pac', True)
expected_groups = kdc_dict.pop('expected_groups', None)
unexpected_groups = kdc_dict.pop('unexpected_groups', None)
client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
ARCFOUR_HMAC_MD5))
if s4u2self:
self.assertEqual(authtime_delay, 0)
def generate_s4u2self_padata(_kdc_exchange_dict,
_callback_dict,
req_body):
pa_s4u = self.PA_S4U2Self_create(
name=client_cname,
realm=client_realm,
tgt_session_key=service1_tgt.session_key,
ctype=None)
return [pa_s4u], req_body
s4u2self_expected_flags = krb5_asn1.TicketFlags('forwardable')
s4u2self_unexpected_flags = krb5_asn1.TicketFlags('0')
s4u2self_kdc_options = krb5_asn1.KDCOptions('forwardable')
s4u2self_authenticator_subkey = self.RandomKey(Enctype.AES256)
s4u2self_kdc_exchange_dict = self.tgs_exchange_dict(
expected_crealm=client_realm,
expected_cname=client_cname,
expected_srealm=service1_realm,
expected_sname=service1_sname,
expected_account_name=client_username,
expected_groups=expected_groups,
unexpected_groups=unexpected_groups,
expected_sid=sid,
expected_flags=s4u2self_expected_flags,
unexpected_flags=s4u2self_unexpected_flags,
ticket_decryption_key=service1_decryption_key,
generate_padata_fn=generate_s4u2self_padata,
check_rep_fn=self.generic_check_kdc_rep,
check_kdc_private_fn=self.generic_check_kdc_private,
tgt=service1_tgt,
authenticator_subkey=s4u2self_authenticator_subkey,
kdc_options=str(s4u2self_kdc_options),
expect_edata=False)
self._generic_kdc_exchange(s4u2self_kdc_exchange_dict,
cname=None,
realm=service1_realm,
sname=service1_sname,
etypes=etypes)
client_service_tkt = s4u2self_kdc_exchange_dict['rep_ticket_creds']
else:
if authtime_delay != 0:
time.sleep(authtime_delay)
fresh = True
else:
fresh = False
client_tgt = self.get_tgt(client_creds,
kdc_options=client_tkt_options,
expected_flags=expected_flags,
fresh=fresh)
client_service_tkt = self.get_service_ticket(
client_tgt,
service1_creds,
kdc_options=client_tkt_options,
expected_flags=expected_flags,
fresh=fresh)
modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
if modify_client_tkt_fn is not None:
client_service_tkt = modify_client_tkt_fn(client_service_tkt)
self.assertElementPresent(client_service_tkt.ticket_private, 'authtime')
expected_authtime = self.getElementValue(client_service_tkt.ticket_private, 'authtime')
if authtime_delay > 1:
self.assertNotEqual(expected_authtime, service1_tgt_authtime)
additional_tickets = [client_service_tkt.ticket]
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
if modify_service_tgt_fn is not None:
service1_tgt = modify_service_tgt_fn(service1_tgt)
kdc_options = kdc_dict.pop('kdc_options', None)
if kdc_options is None:
kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
service2_name = service2_creds.get_username()[:-1]
service2_realm = service2_creds.get_realm()
service2_service = 'host'
service2_sname = self.PrincipalName_create(
name_type=NT_PRINCIPAL, names=[service2_service,
service2_name])
service2_decryption_key = self.TicketDecryptionKey_from_creds(
service2_creds)
service2_etypes = service2_creds.tgs_supported_enctypes
expected_error_mode = kdc_dict.pop('expected_error_mode')
expect_status = kdc_dict.pop('expect_status', None)
expected_status = kdc_dict.pop('expected_status', None)
if expected_error_mode:
check_error_fn = self.generic_check_kdc_error
check_rep_fn = None
else:
check_error_fn = None
check_rep_fn = self.generic_check_kdc_rep
self.assertIsNone(expect_status)
self.assertIsNone(expected_status)
expect_edata = kdc_dict.pop('expect_edata', None)
if expect_edata is not None:
self.assertTrue(expected_error_mode)
pac_options = kdc_dict.pop('pac_options', None)
use_authenticator_subkey = kdc_dict.pop('use_authenticator_subkey', True)
if use_authenticator_subkey:
authenticator_subkey = self.RandomKey(Enctype.AES256)
else:
authenticator_subkey = None
expected_proxy_target = service2_creds.get_spn()
expected_transited_services = kdc_dict.pop(
'expected_transited_services', [])
transited_service = f'host/{service1_name}@{service1_realm}'
expected_transited_services.append(transited_service)
kdc_exchange_dict = self.tgs_exchange_dict(
expected_crealm=client_realm,
expected_cname=client_cname,
expected_srealm=service2_realm,
expected_sname=service2_sname,
expected_account_name=client_username,
expected_groups=expected_groups,
unexpected_groups=unexpected_groups,
expected_sid=sid,
expected_supported_etypes=service2_etypes,
ticket_decryption_key=service2_decryption_key,
check_error_fn=check_error_fn,
check_rep_fn=check_rep_fn,
check_kdc_private_fn=self.generic_check_kdc_private,
expected_error_mode=expected_error_mode,
expect_status=expect_status,
expected_status=expected_status,
callback_dict={},
tgt=service1_tgt,
authenticator_subkey=authenticator_subkey,
kdc_options=kdc_options,
pac_options=pac_options,
expect_edata=expect_edata,
expected_proxy_target=expected_proxy_target,
expected_transited_services=expected_transited_services,
expect_pac=expect_pac)
EncAuthorizationData = kdc_dict.pop('enc-authorization-data', None)
if EncAuthorizationData is not None:
if authenticator_subkey is not None:
EncAuthorizationData_key = authenticator_subkey
EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
else:
EncAuthorizationData_key = client_service_tkt.session_key
EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
else:
EncAuthorizationData_key = None
EncAuthorizationData_usage = None
self._generic_kdc_exchange(kdc_exchange_dict,
cname=None,
realm=service2_realm,
sname=service2_sname,
etypes=etypes,
additional_tickets=additional_tickets,
EncAuthorizationData=EncAuthorizationData,
EncAuthorizationData_key=EncAuthorizationData_key,
EncAuthorizationData_usage=EncAuthorizationData_usage)
if not expected_error_mode:
# Check whether the ticket contains a PAC.
ticket = kdc_exchange_dict['rep_ticket_creds']
self.assertElementEqual(ticket.ticket_private, 'authtime', expected_authtime)
pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
ticket_auth_data = ticket.ticket_private.get('authorization-data')
expected_num_ticket_auth_data = 0
if expect_pac:
self.assertIsNotNone(pac)
expected_num_ticket_auth_data += 1
else:
self.assertIsNone(pac)
if EncAuthorizationData is not None:
expected_num_ticket_auth_data += len(EncAuthorizationData)
if expected_num_ticket_auth_data == 0:
self.assertIsNone(ticket_auth_data)
else:
self.assertIsNotNone(ticket_auth_data)
self.assertEqual(len(ticket_auth_data),
expected_num_ticket_auth_data)
if EncAuthorizationData is not None:
enc_ad_plain = self.der_encode(
EncAuthorizationData,
asn1Spec=krb5_asn1.AuthorizationData())
req_EncAuthorizationData = self.der_decode(
enc_ad_plain,
asn1Spec=krb5_asn1.AuthorizationData())
rep_EncAuthorizationData = ticket_auth_data.copy()
if expect_pac:
rep_EncAuthorizationData.pop(0)
self.assertEqual(rep_EncAuthorizationData, req_EncAuthorizationData)
# Ensure we used all the parameters given to us.
self.assertEqual({}, kdc_dict)
def skip_unless_fl2008(self):
samdb = self.get_samdb()
functional_level = self.get_domain_functional_level(samdb)
if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
self.skipTest('RBCD requires FL2008')
def test_constrained_delegation(self):
# Test constrained delegation.
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True
})
def test_constrained_delegation_authtime(self):
# Test constrained delegation.
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'authtime_delay': 2,
})
def test_constrained_delegation_with_enc_auth_data_subkey(self):
# Test constrained delegation.
EncAuthorizationData = []
relevant_elems = []
auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
relevant_elems.append(auth_data777)
auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
relevant_elems.append(auth_data999)
ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
EncAuthorizationData.append(ad_data)
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'enc-authorization-data': EncAuthorizationData,
})
def test_constrained_delegation_with_enc_auth_data_no_subkey(self):
# Test constrained delegation.
EncAuthorizationData = []
relevant_elems = []
auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
relevant_elems.append(auth_data777)
auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
relevant_elems.append(auth_data999)
ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
EncAuthorizationData.append(ad_data)
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'enc-authorization-data': EncAuthorizationData,
'use_authenticator_subkey': False,
})
def test_constrained_delegation_authentication_asserted_identity(self):
# Test constrained delegation and check asserted identity is the
# authentication authority. Note that we should always find this
# SID for all the requests. Just S4U2Self will have a different SID.
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'expected_groups': {
(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
SidType.EXTRA_SID,
self.default_attrs),
...
},
'unexpected_groups': {
security.SID_SERVICE_ASSERTED_IDENTITY,
},
})
def test_constrained_delegation_service_asserted_identity(self):
# Test constrained delegation and check asserted identity is the
# service sid is there. This is a S4U2Proxy + S4U2Self test.
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
's4u2self': True,
'service1_opts': {
'trusted_to_auth_for_delegation': True,
},
'expected_groups': {
(security.SID_SERVICE_ASSERTED_IDENTITY,
SidType.EXTRA_SID,
self.default_attrs),
...
},
'unexpected_groups': {
security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
},
})
def test_constrained_delegation_no_auth_data_required(self):
# Test constrained delegation.
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'service2_opts': {
'no_auth_data_required': True
},
'expect_pac': False
})
def test_constrained_delegation_existing_delegation_info(self):
# Test constrained delegation with an existing S4U_DELEGATION_INFO
# structure in the PAC.
services = ['service1', 'service2', 'service3']
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_delegation': True,
'modify_client_tkt_fn': functools.partial(
self.add_delegation_info, services=services),
'expected_transited_services': services
})
def test_constrained_delegation_not_allowed(self):
# Test constrained delegation when the delegating service does not
# allow it.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_delegation': False
})
def test_constrained_delegation_no_client_pac(self):
# Test constrained delegation when the client service ticket does not
# contain a PAC.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_TGT_REVOKED),
'allow_delegation': True,
'modify_client_tkt_fn': self.remove_ticket_pac,
'expect_edata': False
})
def test_constrained_delegation_no_service_pac(self):
# Test constrained delegation when the service TGT does not contain a
# PAC.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_TGT_REVOKED,
'allow_delegation': True,
'modify_service_tgt_fn': self.remove_ticket_pac,
'expect_edata': False
})
def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
# Test constrained delegation when the client service ticket does not
# contain a PAC.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_BADOPTION,
KDC_ERR_TGT_REVOKED),
'allow_delegation': True,
'modify_client_tkt_fn': self.remove_ticket_pac,
'expect_edata': False,
'service2_opts': {
'no_auth_data_required': True
}
})
def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
# Test constrained delegation when the service TGT does not contain a
# PAC.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_TGT_REVOKED,
'allow_delegation': True,
'modify_service_tgt_fn': self.remove_ticket_pac,
'service2_opts': {
'no_auth_data_required': True
},
'expect_pac': False,
'expect_edata': False
})
def test_constrained_delegation_non_forwardable(self):
# Test constrained delegation with a non-forwardable ticket.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
'allow_delegation': True,
'modify_client_tkt_fn': functools.partial(
self.set_ticket_forwardable, flag=False)
})
def test_constrained_delegation_pac_options_rbcd(self):
# Test constrained delegation, but with the RBCD bit set in the PAC
# options.
self._run_delegation_test(
{
'expected_error_mode': 0,
'pac_options': '0001', # supports RBCD
'allow_delegation': True
})
def test_rbcd(self):
# Test resource-based constrained delegation.
self.skip_unless_fl2008()
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
})
def test_rbcd_no_auth_data_required(self):
self.skip_unless_fl2008()
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'service2_opts': {
'no_auth_data_required': True
},
'expect_pac': False
})
def test_rbcd_existing_delegation_info(self):
self.skip_unless_fl2008()
# Test constrained delegation with an existing S4U_DELEGATION_INFO
# structure in the PAC.
services = ['service1', 'service2', 'service3']
self._run_delegation_test(
{
'expected_error_mode': 0,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': functools.partial(
self.add_delegation_info, services=services),
'expected_transited_services': services
})
def test_rbcd_not_allowed(self):
# Test resource-based constrained delegation when the target service
# does not allow it.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
'allow_rbcd': False,
'pac_options': '0001' # supports RBCD
})
def test_rbcd_no_client_pac_a(self):
self.skip_unless_fl2008()
# Test constrained delegation when the client service ticket does not
# contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_TGT_REVOKED),
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': self.remove_ticket_pac
})
def test_rbcd_no_client_pac_b(self):
self.skip_unless_fl2008()
# Test constrained delegation when the client service ticket does not
# contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_TGT_REVOKED),
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': self.remove_ticket_pac,
'service1_opts': {
'delegation_to_spn': ('host/test')
}
})
def test_rbcd_no_service_pac(self):
self.skip_unless_fl2008()
# Test constrained delegation when the service TGT does not contain a
# PAC.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_TGT_REVOKED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_service_tgt_fn': self.remove_ticket_pac,
'expect_edata': False
})
def test_rbcd_no_client_pac_no_auth_data_required_a(self):
self.skip_unless_fl2008()
# Test constrained delegation when the client service ticket does not
# contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_TGT_REVOKED),
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': self.remove_ticket_pac,
'service2_opts': {
'no_auth_data_required': True
}
})
def test_rbcd_no_client_pac_no_auth_data_required_b(self):
self.skip_unless_fl2008()
# Test constrained delegation when the client service ticket does not
# contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_TGT_REVOKED),
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': self.remove_ticket_pac,
'service1_opts': {
'delegation_to_spn': ('host/test')
},
'service2_opts': {
'no_auth_data_required': True
}
})
def test_rbcd_no_service_pac_no_auth_data_required(self):
self.skip_unless_fl2008()
# Test constrained delegation when the service TGT does not contain a
# PAC.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_TGT_REVOKED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_service_tgt_fn': self.remove_ticket_pac,
'service2_opts': {
'no_auth_data_required': True
},
'expect_edata': False
})
def test_rbcd_non_forwardable(self):
self.skip_unless_fl2008()
# Test resource-based constrained delegation with a non-forwardable
# ticket.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': functools.partial(
self.set_ticket_forwardable, flag=False)
})
def test_rbcd_no_pac_options_a(self):
self.skip_unless_fl2008()
# Test resource-based constrained delegation without the RBCD bit set
# in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '1' # does not support RBCD
})
def test_rbcd_no_pac_options_b(self):
self.skip_unless_fl2008()
# Test resource-based constrained delegation without the RBCD bit set
# in the PAC options, and a non-empty msDS-AllowedToDelegateTo
# attribute.
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_BADOPTION,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
'allow_rbcd': True,
'pac_options': '1', # does not support RBCD
'service1_opts': {
'delegation_to_spn': ('host/test')
}
})
def test_bronze_bit_constrained_delegation_old_checksum(self):
# Attempt to modify the ticket without updating the PAC checksums.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY),
'allow_delegation': True,
'client_tkt_options': '0', # non-forwardable ticket
'modify_client_tkt_fn': functools.partial(
self.set_ticket_forwardable,
flag=True, update_pac_checksums=False),
'expect_edata': False
})
def test_bronze_bit_rbcd_old_checksum(self):
self.skip_unless_fl2008()
# Attempt to modify the ticket without updating the PAC checksums.
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY),
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'client_tkt_options': '0', # non-forwardable ticket
'modify_client_tkt_fn': functools.partial(
self.set_ticket_forwardable,
flag=True, update_pac_checksums=False)
})
def test_constrained_delegation_missing_client_checksum(self):
# Present a user ticket without the required checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
expected_error_mode = (KDC_ERR_MODIFIED,
KDC_ERR_BADOPTION)
else:
expected_error_mode = KDC_ERR_GENERIC
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'allow_delegation': True,
'modify_client_tkt_fn': functools.partial(
self.remove_pac_checksum, checksum=checksum),
'expect_edata': False
})
def test_constrained_delegation_missing_service_checksum(self):
# Present the service's ticket without the required checksums.
for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
krb5pac.PAC_TYPE_KDC_CHECKSUM):
with self.subTest(checksum=checksum):
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_GENERIC,
# We arent particular about whether or not we get an
# NTSTATUS.
'expect_status': None,
'expected_status':
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
'allow_delegation': True,
'modify_service_tgt_fn': functools.partial(
self.remove_pac_checksum, checksum=checksum)
})
def test_rbcd_missing_client_checksum(self):
self.skip_unless_fl2008()
# Present a user ticket without the required checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
expected_error_mode = (KDC_ERR_MODIFIED,
KDC_ERR_BADOPTION)
else:
expected_error_mode = KDC_ERR_GENERIC
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
# We arent particular about whether or not we get an
# NTSTATUS.
'expect_status': None,
'expected_status':
ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': functools.partial(
self.remove_pac_checksum, checksum=checksum)
})
def test_rbcd_missing_service_checksum(self):
self.skip_unless_fl2008()
# Present the service's ticket without the required checksums.
for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
krb5pac.PAC_TYPE_KDC_CHECKSUM):
with self.subTest(checksum=checksum):
self._run_delegation_test(
{
'expected_error_mode': KDC_ERR_GENERIC,
# We arent particular about whether or not we get an
# NTSTATUS.
'expect_status': None,
'expected_status':
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_service_tgt_fn': functools.partial(
self.remove_pac_checksum, checksum=checksum)
})
def test_constrained_delegation_zeroed_client_checksum(self):
# Present a user ticket with invalid checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY),
'allow_delegation': True,
'modify_client_tkt_fn': functools.partial(
self.zeroed_pac_checksum, checksum=checksum),
'expect_edata': False
})
def test_constrained_delegation_zeroed_service_checksum(self):
# Present the service's ticket with invalid checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
expected_error_mode = (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY)
# We arent particular about whether or not we get an
# NTSTATUS.
expect_status = None
expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
else:
expected_error_mode = 0
expect_status = None
expected_status = None
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'expect_status': expect_status,
'expected_status': expected_status,
'allow_delegation': True,
'modify_service_tgt_fn': functools.partial(
self.zeroed_pac_checksum, checksum=checksum)
})
def test_rbcd_zeroed_client_checksum(self):
self.skip_unless_fl2008()
# Present a user ticket with invalid checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
self._run_delegation_test(
{
'expected_error_mode': (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY),
# We arent particular about whether or not we get an
# NTSTATUS.
'expect_status': None,
'expected_status':
ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': functools.partial(
self.zeroed_pac_checksum, checksum=checksum)
})
def test_rbcd_zeroed_service_checksum(self):
self.skip_unless_fl2008()
# Present the service's ticket with invalid checksums.
for checksum in self.pac_checksum_types:
with self.subTest(checksum=checksum):
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
expected_error_mode = (KDC_ERR_MODIFIED,
KDC_ERR_BAD_INTEGRITY)
# We arent particular about whether or not we get an
# NTSTATUS.
expect_status = None
expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
else:
expected_error_mode = 0
expect_status = None
expected_status = None
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'expect_status': expect_status,
'expected_status': expected_status,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_service_tgt_fn': functools.partial(
self.zeroed_pac_checksum, checksum=checksum)
})
unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
def test_constrained_delegation_unkeyed_client_checksum(self):
# Present a user ticket with invalid checksums.
for checksum in self.pac_checksum_types:
for ctype in self.unkeyed_ctypes:
with self.subTest(checksum=checksum, ctype=ctype):
if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
and ctype == Cksumtype.SHA1):
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
KDC_ERR_INAPP_CKSUM)
else:
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM)
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'allow_delegation': True,
'modify_client_tkt_fn': functools.partial(
self.unkeyed_pac_checksum,
checksum=checksum, ctype=ctype),
'expect_edata': False
})
def test_constrained_delegation_unkeyed_service_checksum(self):
# Present the service's ticket with invalid checksums.
for checksum in self.pac_checksum_types:
for ctype in self.unkeyed_ctypes:
with self.subTest(checksum=checksum, ctype=ctype):
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
# We arent particular about whether or not we get an
# NTSTATUS.
expect_status = None
if ctype == Cksumtype.SHA1:
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
KDC_ERR_INAPP_CKSUM)
expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
else:
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM)
expected_status = (
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
else:
expected_error_mode = 0
expect_status = None
expected_status = None
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'expect_status': expect_status,
'expected_status': expected_status,
'allow_delegation': True,
'modify_service_tgt_fn': functools.partial(
self.unkeyed_pac_checksum,
checksum=checksum, ctype=ctype)
})
def test_rbcd_unkeyed_client_checksum(self):
self.skip_unless_fl2008()
# Present a user ticket with invalid checksums.
for checksum in self.pac_checksum_types:
for ctype in self.unkeyed_ctypes:
with self.subTest(checksum=checksum, ctype=ctype):
if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
and ctype == Cksumtype.SHA1):
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
KDC_ERR_INAPP_CKSUM)
else:
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM)
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
# We arent particular about whether or not we get
# an NTSTATUS.
'expect_status': None,
'expected_status':
ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': functools.partial(
self.unkeyed_pac_checksum,
checksum=checksum, ctype=ctype)
})
def test_rbcd_unkeyed_service_checksum(self):
self.skip_unless_fl2008()
# Present the service's ticket with invalid checksums.
for checksum in self.pac_checksum_types:
for ctype in self.unkeyed_ctypes:
with self.subTest(checksum=checksum, ctype=ctype):
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
# We arent particular about whether or not we get an
# NTSTATUS.
expect_status = None
if ctype == Cksumtype.SHA1:
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
KDC_ERR_INAPP_CKSUM)
expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
else:
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM)
expected_status = (
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
else:
expected_error_mode = 0
expect_status = None
expected_status = None
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'expect_status': expect_status,
'expected_status': expected_status,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_service_tgt_fn': functools.partial(
self.unkeyed_pac_checksum,
checksum=checksum, ctype=ctype)
})
def test_constrained_delegation_rc4_client_checksum(self):
# Present a user ticket with RC4 checksums.
samdb = self.get_samdb()
functional_level = self.get_domain_functional_level(samdb)
if functional_level >= dsdb.DS_DOMAIN_FUNCTION_2008:
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_INAPP_CKSUM)
expect_edata = False
else:
expected_error_mode = 0
expect_edata = None
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
'allow_delegation': True,
'modify_client_tkt_fn': self.rc4_pac_checksums,
'expect_edata': expect_edata,
})
def test_rbcd_rc4_client_checksum(self):
self.skip_unless_fl2008()
# Present a user ticket with RC4 checksums.
expected_error_mode = (KDC_ERR_GENERIC,
KDC_ERR_BADOPTION)
self._run_delegation_test(
{
'expected_error_mode': expected_error_mode,
# We arent particular about whether or not we get an NTSTATUS.
'expect_status': None,
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
'modify_client_tkt_fn': self.rc4_pac_checksums,
})
def test_constrained_delegation_rodc_issued(self):
self._run_delegation_test(
{
# Test that RODC-issued constrained delegation tickets are
# accepted.
'expected_error_mode': 0,
'allow_delegation': True,
# Both tickets must be signed by the same RODC.
'modify_client_tkt_fn': self.signed_by_rodc,
'modify_service_tgt_fn': self.issued_by_rodc,
'client_opts': {
'allowed_replication_mock': True,
'revealed_to_mock_rodc': True,
},
'service1_opts': {
'allowed_replication_mock': True,
'revealed_to_mock_rodc': True,
},
})
def test_rbcd_rodc_issued(self):
self.skip_unless_fl2008()
self._run_delegation_test(
{
# Test that RODC-issued constrained delegation tickets are
# accepted.
'expected_error_mode': 0,
'allow_rbcd': True,
'pac_options': '0001', # supports RBCD
# Both tickets must be signed by the same RODC.
'modify_client_tkt_fn': self.signed_by_rodc,
'modify_service_tgt_fn': self.issued_by_rodc,
'client_opts': {
'allowed_replication_mock': True,
'revealed_to_mock_rodc': True,
},
'service1_opts': {
'allowed_replication_mock': True,
'revealed_to_mock_rodc': True,
},
})
def remove_pac_checksum(self, ticket, checksum):
checksum_keys = self.get_krbtgt_checksum_key()
return self.modified_ticket(ticket,
checksum_keys=checksum_keys,
include_checksums={checksum: False})
def zeroed_pac_checksum(self, ticket, checksum):
krbtgt_creds = self.get_krbtgt_creds()
krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
server_key = ticket.decryption_key
checksum_keys = {
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
}
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
zeroed_key = server_key
else:
zeroed_key = krbtgt_key
checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
zeroed_key.kvno)
return self.modified_ticket(ticket,
checksum_keys=checksum_keys,
include_checksums={checksum: True})
def unkeyed_pac_checksum(self, ticket, checksum, ctype):
krbtgt_creds = self.get_krbtgt_creds()
krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
server_key = ticket.decryption_key
checksum_keys = {
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
krb5pac.PAC_TYPE_FULL_CHECKSUM: krbtgt_key,
}
# Make a copy of the existing key and change the ctype.
key = checksum_keys[checksum]
new_key = RodcPacEncryptionKey(key.key, key.kvno)
new_key.ctype = ctype
checksum_keys[checksum] = new_key
return self.modified_ticket(ticket,
checksum_keys=checksum_keys,
include_checksums={checksum: True})
def rc4_pac_checksums(self, ticket):
krbtgt_creds = self.get_krbtgt_creds()
rc4_krbtgt_key = self.TicketDecryptionKey_from_creds(
krbtgt_creds, etype=Enctype.RC4)
server_key = ticket.decryption_key
checksum_keys = {
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
krb5pac.PAC_TYPE_KDC_CHECKSUM: rc4_krbtgt_key,
krb5pac.PAC_TYPE_TICKET_CHECKSUM: rc4_krbtgt_key,
krb5pac.PAC_TYPE_FULL_CHECKSUM: rc4_krbtgt_key,
}
include_checksums = {
krb5pac.PAC_TYPE_SRV_CHECKSUM: True,
krb5pac.PAC_TYPE_KDC_CHECKSUM: True,
krb5pac.PAC_TYPE_TICKET_CHECKSUM: True,
krb5pac.PAC_TYPE_FULL_CHECKSUM: True,
}
return self.modified_ticket(ticket,
checksum_keys=checksum_keys,
include_checksums=include_checksums)
def add_delegation_info(self, ticket, *, services):
def modify_pac_fn(pac):
pac_buffers = pac.buffers
self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
(buffer.type for buffer in pac_buffers))
transited_services = list(map(lsa.String, services))
delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
delegation.proxy_target = lsa.String('test_proxy_target')
delegation.transited_services = transited_services
delegation.num_transited_services = len(transited_services)
info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
info.info = delegation
pac_buffer = krb5pac.PAC_BUFFER()
pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
pac_buffer.info = info
pac_buffers.append(pac_buffer)
pac.buffers = pac_buffers
pac.num_buffers += 1
return pac
checksum_keys = self.get_krbtgt_checksum_key()
return self.modified_ticket(ticket,
checksum_keys=checksum_keys,
modify_pac_fn=modify_pac_fn)
def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
modify_fn = functools.partial(self.modify_ticket_flag,
flag='forwardable',
value=flag)
if update_pac_checksums:
checksum_keys = self.get_krbtgt_checksum_key()
else:
checksum_keys = None
return self.modified_ticket(ticket,
modify_fn=modify_fn,
checksum_keys=checksum_keys,
update_pac_checksums=update_pac_checksums)
def remove_ticket_pac(self, ticket):
return self.modified_ticket(ticket,
exclude_pac=True)
if __name__ == "__main__":
global_asn1_print = False
global_hexdump = False
import unittest
unittest.main()