mirror of
https://github.com/samba-team/samba.git
synced 2025-09-13 17:44:21 +03:00
Signed-off-by: Rob van der Linde <rob@catalyst.net.nz> Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org> [abartlet@samba.org Some python2 style super() calls remain due to being an actual, even if reasonable, behaviour change]
1839 lines
74 KiB
Python
Executable File
1839 lines
74 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# Unix SMB/CIFS implementation.
|
||
# Copyright (C) Stefan Metzmacher 2020
|
||
#
|
||
# This program is free software; you can redistribute it and/or modify
|
||
# it under the terms of the GNU General Public License as published by
|
||
# the Free Software Foundation; either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU General Public License
|
||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
#
|
||
|
||
import sys
|
||
import os
|
||
|
||
sys.path.insert(0, "bin/python")
|
||
os.environ["PYTHONUNBUFFERED"] = "1"
|
||
|
||
import functools
|
||
import time
|
||
|
||
from samba import dsdb, ntstatus
|
||
from samba.dcerpc import krb5pac, lsa, security
|
||
|
||
from samba.tests import env_get_var_value
|
||
from samba.tests.krb5.kcrypto import Cksumtype, Enctype
|
||
from samba.tests.krb5.kdc_base_test import KDCBaseTest
|
||
from samba.tests.krb5.raw_testcase import (
|
||
RawKerberosTest,
|
||
RodcPacEncryptionKey,
|
||
ZeroedChecksumKey
|
||
)
|
||
from samba.tests.krb5.rfc4120_constants import (
|
||
AD_IF_RELEVANT,
|
||
AES256_CTS_HMAC_SHA1_96,
|
||
ARCFOUR_HMAC_MD5,
|
||
KDC_ERR_BADMATCH,
|
||
KDC_ERR_BADOPTION,
|
||
KDC_ERR_BAD_INTEGRITY,
|
||
KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM,
|
||
KDC_ERR_MODIFIED,
|
||
KDC_ERR_SUMTYPE_NOSUPP,
|
||
KDC_ERR_TGT_REVOKED,
|
||
KU_AS_REP_ENC_PART,
|
||
KU_PA_ENC_TIMESTAMP,
|
||
KU_TGS_REP_ENC_PART_SUB_KEY,
|
||
KU_TGS_REQ_AUTH_DAT_SESSION,
|
||
KU_TGS_REQ_AUTH_DAT_SUBKEY,
|
||
NT_PRINCIPAL,
|
||
)
|
||
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
|
||
|
||
SidType = RawKerberosTest.SidType
|
||
|
||
global_asn1_print = False
|
||
global_hexdump = False
|
||
|
||
|
||
class S4UKerberosTests(KDCBaseTest):
|
||
|
||
default_attrs = security.SE_GROUP_DEFAULT_FLAGS
|
||
|
||
def setUp(self):
|
||
super().setUp()
|
||
self.do_asn1_print = global_asn1_print
|
||
self.do_hexdump = global_hexdump
|
||
|
||
def _test_s4u2self(self, pa_s4u2self_ctype=None):
|
||
service_creds = self.get_service_creds()
|
||
service = service_creds.get_username()
|
||
realm = service_creds.get_realm()
|
||
|
||
cname = self.PrincipalName_create(name_type=1, names=[service])
|
||
sname = self.PrincipalName_create(name_type=2, names=["krbtgt", realm])
|
||
|
||
till = self.get_KerberosTime(offset=36000)
|
||
|
||
kdc_options = krb5_asn1.KDCOptions('forwardable')
|
||
padata = None
|
||
|
||
etypes = (18, 17, 23)
|
||
|
||
req = self.AS_REQ_create(padata=padata,
|
||
kdc_options=str(kdc_options),
|
||
cname=cname,
|
||
realm=realm,
|
||
sname=sname,
|
||
from_time=None,
|
||
till_time=till,
|
||
renew_time=None,
|
||
nonce=0x7fffffff,
|
||
etypes=etypes,
|
||
addresses=None,
|
||
additional_tickets=None)
|
||
rep = self.send_recv_transaction(req)
|
||
self.assertIsNotNone(rep)
|
||
|
||
self.assertEqual(rep['msg-type'], 30)
|
||
self.assertEqual(rep['error-code'], 25)
|
||
rep_padata = self.der_decode(
|
||
rep['e-data'], asn1Spec=krb5_asn1.METHOD_DATA())
|
||
|
||
for pa in rep_padata:
|
||
if pa['padata-type'] == 19:
|
||
etype_info2 = pa['padata-value']
|
||
break
|
||
|
||
etype_info2 = self.der_decode(
|
||
etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2())
|
||
|
||
key = self.PasswordKey_from_etype_info2(service_creds, etype_info2[0])
|
||
|
||
(patime, pausec) = self.get_KerberosTimeWithUsec()
|
||
pa_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
|
||
pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
|
||
|
||
pa_ts = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, pa_ts)
|
||
pa_ts = self.der_encode(pa_ts, asn1Spec=krb5_asn1.EncryptedData())
|
||
|
||
pa_ts = self.PA_DATA_create(2, pa_ts)
|
||
|
||
kdc_options = krb5_asn1.KDCOptions('forwardable')
|
||
padata = [pa_ts]
|
||
|
||
req = self.AS_REQ_create(padata=padata,
|
||
kdc_options=str(kdc_options),
|
||
cname=cname,
|
||
realm=realm,
|
||
sname=sname,
|
||
from_time=None,
|
||
till_time=till,
|
||
renew_time=None,
|
||
nonce=0x7fffffff,
|
||
etypes=etypes,
|
||
addresses=None,
|
||
additional_tickets=None)
|
||
rep = self.send_recv_transaction(req)
|
||
self.assertIsNotNone(rep)
|
||
|
||
msg_type = rep['msg-type']
|
||
self.assertEqual(msg_type, 11)
|
||
|
||
enc_part2 = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
|
||
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with
|
||
# application tag 26
|
||
try:
|
||
enc_part2 = self.der_decode(
|
||
enc_part2, asn1Spec=krb5_asn1.EncASRepPart())
|
||
except Exception:
|
||
enc_part2 = self.der_decode(
|
||
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
|
||
|
||
# S4U2Self Request
|
||
sname = cname
|
||
|
||
for_user_name = env_get_var_value('FOR_USER')
|
||
uname = self.PrincipalName_create(name_type=1, names=[for_user_name])
|
||
|
||
kdc_options = krb5_asn1.KDCOptions('forwardable')
|
||
till = self.get_KerberosTime(offset=36000)
|
||
ticket = rep['ticket']
|
||
ticket_session_key = self.EncryptionKey_import(enc_part2['key'])
|
||
pa_s4u = self.PA_S4U2Self_create(name=uname, realm=realm,
|
||
tgt_session_key=ticket_session_key,
|
||
ctype=pa_s4u2self_ctype)
|
||
padata = [pa_s4u]
|
||
|
||
subkey = self.RandomKey(ticket_session_key.etype)
|
||
|
||
(ctime, cusec) = self.get_KerberosTimeWithUsec()
|
||
|
||
req = self.TGS_REQ_create(padata=padata,
|
||
cusec=cusec,
|
||
ctime=ctime,
|
||
ticket=ticket,
|
||
kdc_options=str(kdc_options),
|
||
cname=cname,
|
||
realm=realm,
|
||
sname=sname,
|
||
from_time=None,
|
||
till_time=till,
|
||
renew_time=None,
|
||
nonce=0x7ffffffe,
|
||
etypes=etypes,
|
||
addresses=None,
|
||
EncAuthorizationData=None,
|
||
EncAuthorizationData_key=None,
|
||
additional_tickets=None,
|
||
ticket_session_key=ticket_session_key,
|
||
authenticator_subkey=subkey)
|
||
rep = self.send_recv_transaction(req)
|
||
self.assertIsNotNone(rep)
|
||
|
||
msg_type = rep['msg-type']
|
||
if msg_type == 13:
|
||
enc_part2 = subkey.decrypt(
|
||
KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
|
||
enc_part2 = self.der_decode(
|
||
enc_part2, asn1Spec=krb5_asn1.EncTGSRepPart())
|
||
|
||
return msg_type
|
||
|
||
# Using the checksum type from the tgt_session_key happens to work
|
||
# everywhere
|
||
def test_s4u2self(self):
|
||
msg_type = self._test_s4u2self()
|
||
self.assertEqual(msg_type, 13)
|
||
|
||
# Per spec, the checksum of PA-FOR-USER is HMAC_MD5, see [MS-SFU] 2.2.1
|
||
def test_s4u2self_hmac_md5_checksum(self):
|
||
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.HMAC_MD5)
|
||
self.assertEqual(msg_type, 13)
|
||
|
||
def test_s4u2self_md5_unkeyed_checksum(self):
|
||
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.MD5)
|
||
self.assertEqual(msg_type, 30)
|
||
|
||
def test_s4u2self_sha1_unkeyed_checksum(self):
|
||
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.SHA1)
|
||
self.assertEqual(msg_type, 30)
|
||
|
||
def test_s4u2self_crc32_unkeyed_checksum(self):
|
||
msg_type = self._test_s4u2self(pa_s4u2self_ctype=Cksumtype.CRC32)
|
||
self.assertEqual(msg_type, 30)
|
||
|
||
def _run_s4u2self_test(self, kdc_dict):
|
||
client_opts = kdc_dict.pop('client_opts', None)
|
||
client_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.USER,
|
||
opts=client_opts)
|
||
|
||
service_opts = kdc_dict.pop('service_opts', None)
|
||
service_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts=service_opts)
|
||
|
||
service_tgt = self.get_tgt(service_creds)
|
||
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
|
||
if modify_service_tgt_fn is not None:
|
||
service_tgt = modify_service_tgt_fn(service_tgt)
|
||
|
||
client_name = client_creds.get_username()
|
||
client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
|
||
names=[client_name])
|
||
|
||
service_name = kdc_dict.pop('service_name', None)
|
||
if service_name is None:
|
||
service_name = service_creds.get_username()[:-1]
|
||
service_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
|
||
names=['host', service_name])
|
||
|
||
realm = client_creds.get_realm()
|
||
|
||
expected_flags = kdc_dict.pop('expected_flags', None)
|
||
if expected_flags is not None:
|
||
expected_flags = krb5_asn1.TicketFlags(expected_flags)
|
||
|
||
unexpected_flags = kdc_dict.pop('unexpected_flags', None)
|
||
if unexpected_flags is not None:
|
||
unexpected_flags = krb5_asn1.TicketFlags(unexpected_flags)
|
||
|
||
expected_error_mode = kdc_dict.pop('expected_error_mode', 0)
|
||
expect_status = kdc_dict.pop('expect_status', None)
|
||
expected_status = kdc_dict.pop('expected_status', None)
|
||
if expected_error_mode:
|
||
check_error_fn = self.generic_check_kdc_error
|
||
check_rep_fn = None
|
||
else:
|
||
check_error_fn = None
|
||
check_rep_fn = self.generic_check_kdc_rep
|
||
|
||
self.assertIsNone(expect_status)
|
||
self.assertIsNone(expected_status)
|
||
|
||
kdc_options = kdc_dict.pop('kdc_options', '0')
|
||
kdc_options = krb5_asn1.KDCOptions(kdc_options)
|
||
|
||
service_decryption_key = self.TicketDecryptionKey_from_creds(
|
||
service_creds)
|
||
|
||
authenticator_subkey = self.RandomKey(Enctype.AES256)
|
||
|
||
etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
|
||
ARCFOUR_HMAC_MD5))
|
||
|
||
expect_edata = kdc_dict.pop('expect_edata', None)
|
||
expected_groups = kdc_dict.pop('expected_groups', None)
|
||
unexpected_groups = kdc_dict.pop('unexpected_groups', None)
|
||
|
||
def generate_s4u2self_padata(_kdc_exchange_dict,
|
||
_callback_dict,
|
||
req_body):
|
||
pa_s4u = self.PA_S4U2Self_create(
|
||
name=client_cname,
|
||
realm=realm,
|
||
tgt_session_key=service_tgt.session_key,
|
||
ctype=None)
|
||
|
||
return [pa_s4u], req_body
|
||
|
||
kdc_exchange_dict = self.tgs_exchange_dict(
|
||
expected_crealm=realm,
|
||
expected_cname=client_cname,
|
||
expected_srealm=realm,
|
||
expected_sname=service_sname,
|
||
expected_account_name=client_name,
|
||
expected_groups=expected_groups,
|
||
unexpected_groups=unexpected_groups,
|
||
expected_sid=client_creds.get_sid(),
|
||
expected_flags=expected_flags,
|
||
unexpected_flags=unexpected_flags,
|
||
ticket_decryption_key=service_decryption_key,
|
||
expect_ticket_checksum=True,
|
||
generate_padata_fn=generate_s4u2self_padata,
|
||
check_error_fn=check_error_fn,
|
||
check_rep_fn=check_rep_fn,
|
||
check_kdc_private_fn=self.generic_check_kdc_private,
|
||
expected_error_mode=expected_error_mode,
|
||
expect_status=expect_status,
|
||
expected_status=expected_status,
|
||
tgt=service_tgt,
|
||
authenticator_subkey=authenticator_subkey,
|
||
kdc_options=str(kdc_options),
|
||
expect_edata=expect_edata)
|
||
|
||
self._generic_kdc_exchange(kdc_exchange_dict,
|
||
cname=None,
|
||
realm=realm,
|
||
sname=service_sname,
|
||
etypes=etypes)
|
||
|
||
if not expected_error_mode:
|
||
# Check that the ticket contains a PAC.
|
||
ticket = kdc_exchange_dict['rep_ticket_creds']
|
||
|
||
pac = self.get_ticket_pac(ticket)
|
||
self.assertIsNotNone(pac)
|
||
|
||
# Ensure we used all the parameters given to us.
|
||
self.assertEqual({}, kdc_dict)
|
||
|
||
# Test performing an S4U2Self operation with a forwardable ticket. The
|
||
# resulting ticket should have the 'forwardable' flag set.
|
||
def test_s4u2self_forwardable(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'expected_flags': 'forwardable'
|
||
})
|
||
|
||
# Test performing an S4U2Self operation with a forwardable ticket that does
|
||
# not contain a PAC. The request should fail.
|
||
def test_s4u2self_no_pac(self):
|
||
def forwardable_no_pac(ticket):
|
||
ticket = self.set_ticket_forwardable(ticket, flag=True)
|
||
return self.remove_ticket_pac(ticket)
|
||
|
||
self._run_s4u2self_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_TGT_REVOKED,
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': forwardable_no_pac,
|
||
'expected_flags': 'forwardable',
|
||
'expect_edata': False
|
||
})
|
||
|
||
# Test performing an S4U2Self operation without requesting a forwardable
|
||
# ticket. The resulting ticket should not have the 'forwardable' flag set.
|
||
def test_s4u2self_without_forwardable(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'unexpected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with a non-forwardable TGT. The 'forwardable' flag should
|
||
# not be set on the ticket.
|
||
def test_s4u2self_not_forwardable(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=False),
|
||
'unexpected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with the not_delegated flag set on the client. The
|
||
# 'forwardable' flag should not be set on the ticket.
|
||
def test_s4u2self_client_not_delegated(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': True
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'unexpected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with a service not trusted to authenticate for delegation,
|
||
# but having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
|
||
# flag should be set on the ticket.
|
||
def test_s4u2self_not_trusted_empty_allowed(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': False,
|
||
'delegation_to_spn': ()
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'expected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with a service not trusted to authenticate for delegation
|
||
# and having a non-empty msDS-AllowedToDelegateTo attribute. The
|
||
# 'forwardable' flag should not be set on the ticket.
|
||
def test_s4u2self_not_trusted_nonempty_allowed(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': False,
|
||
'delegation_to_spn': ('test',)
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'unexpected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with a service trusted to authenticate for delegation and
|
||
# having an empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
|
||
# flag should be set on the ticket.
|
||
def test_s4u2self_trusted_empty_allowed(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': True,
|
||
'delegation_to_spn': ()
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'expected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with a service trusted to authenticate for delegation and
|
||
# having a non-empty msDS-AllowedToDelegateTo attribute. The 'forwardable'
|
||
# flag should be set on the ticket.
|
||
def test_s4u2self_trusted_nonempty_allowed(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': True,
|
||
'delegation_to_spn': ('test',)
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'expected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self with the sname in the request different to that of the
|
||
# service. We expect an error.
|
||
def test_s4u2self_wrong_sname(self):
|
||
other_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts={
|
||
'trusted_to_auth_for_delegation': True,
|
||
'id': 0
|
||
})
|
||
other_sname = other_creds.get_username()[:-1]
|
||
|
||
self._run_s4u2self_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADMATCH,
|
||
'expect_edata': False,
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': True
|
||
},
|
||
'service_name': other_sname,
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True)
|
||
})
|
||
|
||
# Do an S4U2Self where the service does not require authorization data. The
|
||
# resulting ticket should still contain a PAC.
|
||
def test_s4u2self_no_auth_data_required(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'service_opts': {
|
||
'trusted_to_auth_for_delegation': True,
|
||
'no_auth_data_required': True
|
||
},
|
||
'kdc_options': 'forwardable',
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=True),
|
||
'expected_flags': 'forwardable'
|
||
})
|
||
|
||
# Do an S4U2Self and check that the service asserted identity is part of
|
||
# the sids.
|
||
def test_s4u2self_asserted_identity(self):
|
||
self._run_s4u2self_test(
|
||
{
|
||
'client_opts': {
|
||
'not_delegated': False
|
||
},
|
||
'expected_groups': {
|
||
(security.SID_SERVICE_ASSERTED_IDENTITY,
|
||
SidType.EXTRA_SID,
|
||
self.default_attrs),
|
||
...
|
||
},
|
||
'unexpected_groups': {
|
||
security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
|
||
},
|
||
})
|
||
|
||
def _run_delegation_test(self, kdc_dict):
|
||
s4u2self = kdc_dict.pop('s4u2self', False)
|
||
|
||
authtime_delay = kdc_dict.pop('authtime_delay', 0)
|
||
|
||
client_opts = kdc_dict.pop('client_opts', None)
|
||
client_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.USER,
|
||
opts=client_opts)
|
||
|
||
sid = client_creds.get_sid()
|
||
|
||
service1_opts = kdc_dict.pop('service1_opts', {})
|
||
service2_opts = kdc_dict.pop('service2_opts', {})
|
||
|
||
allow_delegation = kdc_dict.pop('allow_delegation', False)
|
||
allow_rbcd = kdc_dict.pop('allow_rbcd', False)
|
||
self.assertFalse(allow_delegation and allow_rbcd)
|
||
|
||
if allow_rbcd:
|
||
service1_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts=service1_opts)
|
||
|
||
self.assertNotIn('delegation_from_dn', service2_opts)
|
||
service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
|
||
|
||
service2_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts=service2_opts)
|
||
else:
|
||
service2_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts=service2_opts)
|
||
|
||
if allow_delegation:
|
||
self.assertNotIn('delegation_to_spn', service1_opts)
|
||
service1_opts['delegation_to_spn'] = service2_creds.get_spn()
|
||
|
||
service1_creds = self.get_cached_creds(
|
||
account_type=self.AccountType.COMPUTER,
|
||
opts=service1_opts)
|
||
|
||
service1_tgt = self.get_tgt(service1_creds)
|
||
self.assertElementPresent(service1_tgt.ticket_private, 'authtime')
|
||
service1_tgt_authtime = self.getElementValue(service1_tgt.ticket_private, 'authtime')
|
||
|
||
client_username = client_creds.get_username()
|
||
client_realm = client_creds.get_realm()
|
||
client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
|
||
names=[client_username])
|
||
|
||
service1_name = service1_creds.get_username()[:-1]
|
||
service1_realm = service1_creds.get_realm()
|
||
service1_service = 'host'
|
||
service1_sname = self.PrincipalName_create(
|
||
name_type=NT_PRINCIPAL, names=[service1_service,
|
||
service1_name])
|
||
service1_decryption_key = self.TicketDecryptionKey_from_creds(
|
||
service1_creds)
|
||
|
||
expect_pac = kdc_dict.pop('expect_pac', True)
|
||
|
||
expected_groups = kdc_dict.pop('expected_groups', None)
|
||
unexpected_groups = kdc_dict.pop('unexpected_groups', None)
|
||
|
||
client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
|
||
expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
|
||
|
||
etypes = kdc_dict.pop('etypes', (AES256_CTS_HMAC_SHA1_96,
|
||
ARCFOUR_HMAC_MD5))
|
||
|
||
if s4u2self:
|
||
self.assertEqual(authtime_delay, 0)
|
||
|
||
def generate_s4u2self_padata(_kdc_exchange_dict,
|
||
_callback_dict,
|
||
req_body):
|
||
pa_s4u = self.PA_S4U2Self_create(
|
||
name=client_cname,
|
||
realm=client_realm,
|
||
tgt_session_key=service1_tgt.session_key,
|
||
ctype=None)
|
||
|
||
return [pa_s4u], req_body
|
||
|
||
s4u2self_expected_flags = krb5_asn1.TicketFlags('forwardable')
|
||
s4u2self_unexpected_flags = krb5_asn1.TicketFlags('0')
|
||
|
||
s4u2self_kdc_options = krb5_asn1.KDCOptions('forwardable')
|
||
|
||
s4u2self_authenticator_subkey = self.RandomKey(Enctype.AES256)
|
||
s4u2self_kdc_exchange_dict = self.tgs_exchange_dict(
|
||
expected_crealm=client_realm,
|
||
expected_cname=client_cname,
|
||
expected_srealm=service1_realm,
|
||
expected_sname=service1_sname,
|
||
expected_account_name=client_username,
|
||
expected_groups=expected_groups,
|
||
unexpected_groups=unexpected_groups,
|
||
expected_sid=sid,
|
||
expected_flags=s4u2self_expected_flags,
|
||
unexpected_flags=s4u2self_unexpected_flags,
|
||
ticket_decryption_key=service1_decryption_key,
|
||
generate_padata_fn=generate_s4u2self_padata,
|
||
check_rep_fn=self.generic_check_kdc_rep,
|
||
check_kdc_private_fn=self.generic_check_kdc_private,
|
||
tgt=service1_tgt,
|
||
authenticator_subkey=s4u2self_authenticator_subkey,
|
||
kdc_options=str(s4u2self_kdc_options),
|
||
expect_edata=False)
|
||
|
||
self._generic_kdc_exchange(s4u2self_kdc_exchange_dict,
|
||
cname=None,
|
||
realm=service1_realm,
|
||
sname=service1_sname,
|
||
etypes=etypes)
|
||
|
||
client_service_tkt = s4u2self_kdc_exchange_dict['rep_ticket_creds']
|
||
else:
|
||
if authtime_delay != 0:
|
||
time.sleep(authtime_delay)
|
||
fresh = True
|
||
else:
|
||
fresh = False
|
||
|
||
client_tgt = self.get_tgt(client_creds,
|
||
kdc_options=client_tkt_options,
|
||
expected_flags=expected_flags,
|
||
fresh=fresh)
|
||
client_service_tkt = self.get_service_ticket(
|
||
client_tgt,
|
||
service1_creds,
|
||
kdc_options=client_tkt_options,
|
||
expected_flags=expected_flags,
|
||
fresh=fresh)
|
||
|
||
modify_client_tkt_fn = kdc_dict.pop('modify_client_tkt_fn', None)
|
||
if modify_client_tkt_fn is not None:
|
||
client_service_tkt = modify_client_tkt_fn(client_service_tkt)
|
||
|
||
self.assertElementPresent(client_service_tkt.ticket_private, 'authtime')
|
||
expected_authtime = self.getElementValue(client_service_tkt.ticket_private, 'authtime')
|
||
if authtime_delay > 1:
|
||
self.assertNotEqual(expected_authtime, service1_tgt_authtime)
|
||
|
||
additional_tickets = [client_service_tkt.ticket]
|
||
|
||
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
|
||
if modify_service_tgt_fn is not None:
|
||
service1_tgt = modify_service_tgt_fn(service1_tgt)
|
||
|
||
kdc_options = kdc_dict.pop('kdc_options', None)
|
||
if kdc_options is None:
|
||
kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
|
||
|
||
service2_name = service2_creds.get_username()[:-1]
|
||
service2_realm = service2_creds.get_realm()
|
||
service2_service = 'host'
|
||
service2_sname = self.PrincipalName_create(
|
||
name_type=NT_PRINCIPAL, names=[service2_service,
|
||
service2_name])
|
||
service2_decryption_key = self.TicketDecryptionKey_from_creds(
|
||
service2_creds)
|
||
service2_etypes = service2_creds.tgs_supported_enctypes
|
||
|
||
expected_error_mode = kdc_dict.pop('expected_error_mode')
|
||
expect_status = kdc_dict.pop('expect_status', None)
|
||
expected_status = kdc_dict.pop('expected_status', None)
|
||
if expected_error_mode:
|
||
check_error_fn = self.generic_check_kdc_error
|
||
check_rep_fn = None
|
||
else:
|
||
check_error_fn = None
|
||
check_rep_fn = self.generic_check_kdc_rep
|
||
|
||
self.assertIsNone(expect_status)
|
||
self.assertIsNone(expected_status)
|
||
|
||
expect_edata = kdc_dict.pop('expect_edata', None)
|
||
if expect_edata is not None:
|
||
self.assertTrue(expected_error_mode)
|
||
|
||
pac_options = kdc_dict.pop('pac_options', None)
|
||
|
||
use_authenticator_subkey = kdc_dict.pop('use_authenticator_subkey', True)
|
||
if use_authenticator_subkey:
|
||
authenticator_subkey = self.RandomKey(Enctype.AES256)
|
||
else:
|
||
authenticator_subkey = None
|
||
|
||
expected_proxy_target = service2_creds.get_spn()
|
||
|
||
expected_transited_services = kdc_dict.pop(
|
||
'expected_transited_services', [])
|
||
|
||
transited_service = f'host/{service1_name}@{service1_realm}'
|
||
expected_transited_services.append(transited_service)
|
||
|
||
kdc_exchange_dict = self.tgs_exchange_dict(
|
||
expected_crealm=client_realm,
|
||
expected_cname=client_cname,
|
||
expected_srealm=service2_realm,
|
||
expected_sname=service2_sname,
|
||
expected_account_name=client_username,
|
||
expected_groups=expected_groups,
|
||
unexpected_groups=unexpected_groups,
|
||
expected_sid=sid,
|
||
expected_supported_etypes=service2_etypes,
|
||
ticket_decryption_key=service2_decryption_key,
|
||
check_error_fn=check_error_fn,
|
||
check_rep_fn=check_rep_fn,
|
||
check_kdc_private_fn=self.generic_check_kdc_private,
|
||
expected_error_mode=expected_error_mode,
|
||
expect_status=expect_status,
|
||
expected_status=expected_status,
|
||
callback_dict={},
|
||
tgt=service1_tgt,
|
||
authenticator_subkey=authenticator_subkey,
|
||
kdc_options=kdc_options,
|
||
pac_options=pac_options,
|
||
expect_edata=expect_edata,
|
||
expected_proxy_target=expected_proxy_target,
|
||
expected_transited_services=expected_transited_services,
|
||
expect_pac=expect_pac)
|
||
|
||
EncAuthorizationData = kdc_dict.pop('enc-authorization-data', None)
|
||
|
||
if EncAuthorizationData is not None:
|
||
if authenticator_subkey is not None:
|
||
EncAuthorizationData_key = authenticator_subkey
|
||
EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
|
||
else:
|
||
EncAuthorizationData_key = client_service_tkt.session_key
|
||
EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
|
||
else:
|
||
EncAuthorizationData_key = None
|
||
EncAuthorizationData_usage = None
|
||
|
||
self._generic_kdc_exchange(kdc_exchange_dict,
|
||
cname=None,
|
||
realm=service2_realm,
|
||
sname=service2_sname,
|
||
etypes=etypes,
|
||
additional_tickets=additional_tickets,
|
||
EncAuthorizationData=EncAuthorizationData,
|
||
EncAuthorizationData_key=EncAuthorizationData_key,
|
||
EncAuthorizationData_usage=EncAuthorizationData_usage)
|
||
|
||
if not expected_error_mode:
|
||
# Check whether the ticket contains a PAC.
|
||
ticket = kdc_exchange_dict['rep_ticket_creds']
|
||
self.assertElementEqual(ticket.ticket_private, 'authtime', expected_authtime)
|
||
pac = self.get_ticket_pac(ticket, expect_pac=expect_pac)
|
||
ticket_auth_data = ticket.ticket_private.get('authorization-data')
|
||
expected_num_ticket_auth_data = 0
|
||
if expect_pac:
|
||
self.assertIsNotNone(pac)
|
||
expected_num_ticket_auth_data += 1
|
||
else:
|
||
self.assertIsNone(pac)
|
||
|
||
if EncAuthorizationData is not None:
|
||
expected_num_ticket_auth_data += len(EncAuthorizationData)
|
||
|
||
if expected_num_ticket_auth_data == 0:
|
||
self.assertIsNone(ticket_auth_data)
|
||
else:
|
||
self.assertIsNotNone(ticket_auth_data)
|
||
self.assertEqual(len(ticket_auth_data),
|
||
expected_num_ticket_auth_data)
|
||
|
||
if EncAuthorizationData is not None:
|
||
enc_ad_plain = self.der_encode(
|
||
EncAuthorizationData,
|
||
asn1Spec=krb5_asn1.AuthorizationData())
|
||
req_EncAuthorizationData = self.der_decode(
|
||
enc_ad_plain,
|
||
asn1Spec=krb5_asn1.AuthorizationData())
|
||
|
||
rep_EncAuthorizationData = ticket_auth_data.copy()
|
||
if expect_pac:
|
||
rep_EncAuthorizationData.pop(0)
|
||
self.assertEqual(rep_EncAuthorizationData, req_EncAuthorizationData)
|
||
|
||
# Ensure we used all the parameters given to us.
|
||
self.assertEqual({}, kdc_dict)
|
||
|
||
def skip_unless_fl2008(self):
|
||
samdb = self.get_samdb()
|
||
functional_level = self.get_domain_functional_level(samdb)
|
||
|
||
if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
|
||
self.skipTest('RBCD requires FL2008')
|
||
|
||
def test_constrained_delegation(self):
|
||
# Test constrained delegation.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True
|
||
})
|
||
|
||
def test_constrained_delegation_authtime(self):
|
||
# Test constrained delegation.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'authtime_delay': 2,
|
||
})
|
||
|
||
def test_constrained_delegation_with_enc_auth_data_subkey(self):
|
||
# Test constrained delegation.
|
||
EncAuthorizationData = []
|
||
relevant_elems = []
|
||
auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
|
||
relevant_elems.append(auth_data777)
|
||
auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
|
||
relevant_elems.append(auth_data999)
|
||
ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
|
||
ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
|
||
EncAuthorizationData.append(ad_data)
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'enc-authorization-data': EncAuthorizationData,
|
||
})
|
||
|
||
def test_constrained_delegation_with_enc_auth_data_no_subkey(self):
|
||
# Test constrained delegation.
|
||
EncAuthorizationData = []
|
||
relevant_elems = []
|
||
auth_data777 = self.AuthorizationData_create(777, b'AuthorizationData777')
|
||
relevant_elems.append(auth_data777)
|
||
auth_data999 = self.AuthorizationData_create(999, b'AuthorizationData999')
|
||
relevant_elems.append(auth_data999)
|
||
ad_relevant = self.der_encode(relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT())
|
||
ad_data = self.AuthorizationData_create(AD_IF_RELEVANT, ad_relevant)
|
||
EncAuthorizationData.append(ad_data)
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'enc-authorization-data': EncAuthorizationData,
|
||
'use_authenticator_subkey': False,
|
||
})
|
||
|
||
def test_constrained_delegation_authentication_asserted_identity(self):
|
||
# Test constrained delegation and check asserted identity is the
|
||
# authentication authority. Note that we should always find this
|
||
# SID for all the requests. Just S4U2Self will have a different SID.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'expected_groups': {
|
||
(security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
|
||
SidType.EXTRA_SID,
|
||
self.default_attrs),
|
||
...
|
||
},
|
||
'unexpected_groups': {
|
||
security.SID_SERVICE_ASSERTED_IDENTITY,
|
||
},
|
||
})
|
||
|
||
def test_constrained_delegation_service_asserted_identity(self):
|
||
# Test constrained delegation and check asserted identity is the
|
||
# service sid is there. This is a S4U2Proxy + S4U2Self test.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
's4u2self': True,
|
||
'service1_opts': {
|
||
'trusted_to_auth_for_delegation': True,
|
||
},
|
||
'expected_groups': {
|
||
(security.SID_SERVICE_ASSERTED_IDENTITY,
|
||
SidType.EXTRA_SID,
|
||
self.default_attrs),
|
||
...
|
||
},
|
||
'unexpected_groups': {
|
||
security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
|
||
},
|
||
})
|
||
|
||
def test_constrained_delegation_no_auth_data_required(self):
|
||
# Test constrained delegation.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
},
|
||
'expect_pac': False
|
||
})
|
||
|
||
def test_constrained_delegation_existing_delegation_info(self):
|
||
# Test constrained delegation with an existing S4U_DELEGATION_INFO
|
||
# structure in the PAC.
|
||
|
||
services = ['service1', 'service2', 'service3']
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.add_delegation_info, services=services),
|
||
'expected_transited_services': services
|
||
})
|
||
|
||
def test_constrained_delegation_not_allowed(self):
|
||
# Test constrained delegation when the delegating service does not
|
||
# allow it.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_delegation': False
|
||
})
|
||
|
||
def test_constrained_delegation_no_client_pac(self):
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_TGT_REVOKED),
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': self.remove_ticket_pac,
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_no_service_pac(self):
|
||
# Test constrained delegation when the service TGT does not contain a
|
||
# PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_TGT_REVOKED,
|
||
'allow_delegation': True,
|
||
'modify_service_tgt_fn': self.remove_ticket_pac,
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_no_client_pac_no_auth_data_required(self):
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BADOPTION,
|
||
KDC_ERR_TGT_REVOKED),
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': self.remove_ticket_pac,
|
||
'expect_edata': False,
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
}
|
||
})
|
||
|
||
def test_constrained_delegation_no_service_pac_no_auth_data_required(self):
|
||
# Test constrained delegation when the service TGT does not contain a
|
||
# PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_TGT_REVOKED,
|
||
'allow_delegation': True,
|
||
'modify_service_tgt_fn': self.remove_ticket_pac,
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
},
|
||
'expect_pac': False,
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_non_forwardable(self):
|
||
# Test constrained delegation with a non-forwardable ticket.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=False)
|
||
})
|
||
|
||
def test_constrained_delegation_pac_options_rbcd(self):
|
||
# Test constrained delegation, but with the RBCD bit set in the PAC
|
||
# options.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'pac_options': '0001', # supports RBCD
|
||
'allow_delegation': True
|
||
})
|
||
|
||
def test_rbcd(self):
|
||
# Test resource-based constrained delegation.
|
||
self.skip_unless_fl2008()
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
})
|
||
|
||
def test_rbcd_no_auth_data_required(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
},
|
||
'expect_pac': False
|
||
})
|
||
|
||
def test_rbcd_existing_delegation_info(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation with an existing S4U_DELEGATION_INFO
|
||
# structure in the PAC.
|
||
|
||
services = ['service1', 'service2', 'service3']
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': 0,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.add_delegation_info, services=services),
|
||
'expected_transited_services': services
|
||
})
|
||
|
||
def test_rbcd_not_allowed(self):
|
||
# Test resource-based constrained delegation when the target service
|
||
# does not allow it.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_FOUND,
|
||
'allow_rbcd': False,
|
||
'pac_options': '0001' # supports RBCD
|
||
})
|
||
|
||
def test_rbcd_no_client_pac_a(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_TGT_REVOKED),
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': self.remove_ticket_pac
|
||
})
|
||
|
||
def test_rbcd_no_client_pac_b(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_TGT_REVOKED),
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': self.remove_ticket_pac,
|
||
'service1_opts': {
|
||
'delegation_to_spn': ('host/test')
|
||
}
|
||
})
|
||
|
||
def test_rbcd_no_service_pac(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the service TGT does not contain a
|
||
# PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_TGT_REVOKED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_service_tgt_fn': self.remove_ticket_pac,
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_rbcd_no_client_pac_no_auth_data_required_a(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC, and an empty msDS-AllowedToDelegateTo attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_TGT_REVOKED),
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': self.remove_ticket_pac,
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
}
|
||
})
|
||
|
||
def test_rbcd_no_client_pac_no_auth_data_required_b(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the client service ticket does not
|
||
# contain a PAC, and a non-empty msDS-AllowedToDelegateTo attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_TGT_REVOKED),
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': self.remove_ticket_pac,
|
||
'service1_opts': {
|
||
'delegation_to_spn': ('host/test')
|
||
},
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
}
|
||
})
|
||
|
||
def test_rbcd_no_service_pac_no_auth_data_required(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test constrained delegation when the service TGT does not contain a
|
||
# PAC.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_TGT_REVOKED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_service_tgt_fn': self.remove_ticket_pac,
|
||
'service2_opts': {
|
||
'no_auth_data_required': True
|
||
},
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_rbcd_non_forwardable(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test resource-based constrained delegation with a non-forwardable
|
||
# ticket.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.set_ticket_forwardable, flag=False)
|
||
})
|
||
|
||
def test_rbcd_no_pac_options_a(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test resource-based constrained delegation without the RBCD bit set
|
||
# in the PAC options, and an empty msDS-AllowedToDelegateTo attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '1' # does not support RBCD
|
||
})
|
||
|
||
def test_rbcd_no_pac_options_b(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Test resource-based constrained delegation without the RBCD bit set
|
||
# in the PAC options, and a non-empty msDS-AllowedToDelegateTo
|
||
# attribute.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_BADOPTION,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NO_MATCH,
|
||
'allow_rbcd': True,
|
||
'pac_options': '1', # does not support RBCD
|
||
'service1_opts': {
|
||
'delegation_to_spn': ('host/test')
|
||
}
|
||
})
|
||
|
||
def test_bronze_bit_constrained_delegation_old_checksum(self):
|
||
# Attempt to modify the ticket without updating the PAC checksums.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY),
|
||
'allow_delegation': True,
|
||
'client_tkt_options': '0', # non-forwardable ticket
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.set_ticket_forwardable,
|
||
flag=True, update_pac_checksums=False),
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_bronze_bit_rbcd_old_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Attempt to modify the ticket without updating the PAC checksums.
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY),
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'client_tkt_options': '0', # non-forwardable ticket
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.set_ticket_forwardable,
|
||
flag=True, update_pac_checksums=False)
|
||
})
|
||
|
||
def test_constrained_delegation_missing_client_checksum(self):
|
||
# Present a user ticket without the required checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
|
||
expected_error_mode = (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BADOPTION)
|
||
else:
|
||
expected_error_mode = KDC_ERR_GENERIC
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.remove_pac_checksum, checksum=checksum),
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_missing_service_checksum(self):
|
||
# Present the service's ticket without the required checksums.
|
||
for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM):
|
||
with self.subTest(checksum=checksum):
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_GENERIC,
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status':
|
||
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
|
||
'allow_delegation': True,
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.remove_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
def test_rbcd_missing_client_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present a user ticket without the required checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
if checksum == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
|
||
expected_error_mode = (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BADOPTION)
|
||
else:
|
||
expected_error_mode = KDC_ERR_GENERIC
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status':
|
||
ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.remove_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
def test_rbcd_missing_service_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present the service's ticket without the required checksums.
|
||
for checksum in (krb5pac.PAC_TYPE_SRV_CHECKSUM,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM):
|
||
with self.subTest(checksum=checksum):
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': KDC_ERR_GENERIC,
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status':
|
||
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.remove_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
def test_constrained_delegation_zeroed_client_checksum(self):
|
||
# Present a user ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY),
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.zeroed_pac_checksum, checksum=checksum),
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_zeroed_service_checksum(self):
|
||
# Present the service's ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
|
||
expected_error_mode = (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY)
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
expect_status = None
|
||
expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
|
||
else:
|
||
expected_error_mode = 0
|
||
expect_status = None
|
||
expected_status = None
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'expect_status': expect_status,
|
||
'expected_status': expected_status,
|
||
'allow_delegation': True,
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.zeroed_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
def test_rbcd_zeroed_client_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present a user ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY),
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status':
|
||
ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.zeroed_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
def test_rbcd_zeroed_service_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present the service's ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
with self.subTest(checksum=checksum):
|
||
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
|
||
expected_error_mode = (KDC_ERR_MODIFIED,
|
||
KDC_ERR_BAD_INTEGRITY)
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
expect_status = None
|
||
expected_status = ntstatus.NT_STATUS_WRONG_PASSWORD
|
||
else:
|
||
expected_error_mode = 0
|
||
expect_status = None
|
||
expected_status = None
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'expect_status': expect_status,
|
||
'expected_status': expected_status,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.zeroed_pac_checksum, checksum=checksum)
|
||
})
|
||
|
||
unkeyed_ctypes = {Cksumtype.MD5, Cksumtype.SHA1, Cksumtype.CRC32}
|
||
|
||
def test_constrained_delegation_unkeyed_client_checksum(self):
|
||
# Present a user ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
for ctype in self.unkeyed_ctypes:
|
||
with self.subTest(checksum=checksum, ctype=ctype):
|
||
if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
|
||
and ctype == Cksumtype.SHA1):
|
||
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
else:
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.unkeyed_pac_checksum,
|
||
checksum=checksum, ctype=ctype),
|
||
'expect_edata': False
|
||
})
|
||
|
||
def test_constrained_delegation_unkeyed_service_checksum(self):
|
||
# Present the service's ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
for ctype in self.unkeyed_ctypes:
|
||
with self.subTest(checksum=checksum, ctype=ctype):
|
||
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
expect_status = None
|
||
if ctype == Cksumtype.SHA1:
|
||
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
|
||
else:
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
expected_status = (
|
||
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
|
||
else:
|
||
expected_error_mode = 0
|
||
expect_status = None
|
||
expected_status = None
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'expect_status': expect_status,
|
||
'expected_status': expected_status,
|
||
'allow_delegation': True,
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.unkeyed_pac_checksum,
|
||
checksum=checksum, ctype=ctype)
|
||
})
|
||
|
||
def test_rbcd_unkeyed_client_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present a user ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
for ctype in self.unkeyed_ctypes:
|
||
with self.subTest(checksum=checksum, ctype=ctype):
|
||
if (checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM
|
||
and ctype == Cksumtype.SHA1):
|
||
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
else:
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
# We aren’t particular about whether or not we get
|
||
# an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status':
|
||
ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': functools.partial(
|
||
self.unkeyed_pac_checksum,
|
||
checksum=checksum, ctype=ctype)
|
||
})
|
||
|
||
def test_rbcd_unkeyed_service_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present the service's ticket with invalid checksums.
|
||
for checksum in self.pac_checksum_types:
|
||
for ctype in self.unkeyed_ctypes:
|
||
with self.subTest(checksum=checksum, ctype=ctype):
|
||
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
|
||
# We aren’t particular about whether or not we get an
|
||
# NTSTATUS.
|
||
expect_status = None
|
||
if ctype == Cksumtype.SHA1:
|
||
expected_error_mode = (KDC_ERR_SUMTYPE_NOSUPP,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
expected_status = ntstatus.NT_STATUS_LOGON_FAILURE
|
||
else:
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
expected_status = (
|
||
ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
|
||
else:
|
||
expected_error_mode = 0
|
||
expect_status = None
|
||
expected_status = None
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'expect_status': expect_status,
|
||
'expected_status': expected_status,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_service_tgt_fn': functools.partial(
|
||
self.unkeyed_pac_checksum,
|
||
checksum=checksum, ctype=ctype)
|
||
})
|
||
|
||
def test_constrained_delegation_rc4_client_checksum(self):
|
||
# Present a user ticket with RC4 checksums.
|
||
samdb = self.get_samdb()
|
||
functional_level = self.get_domain_functional_level(samdb)
|
||
|
||
if functional_level >= dsdb.DS_DOMAIN_FUNCTION_2008:
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_INAPP_CKSUM)
|
||
expect_edata = False
|
||
else:
|
||
expected_error_mode = 0
|
||
expect_edata = None
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
'allow_delegation': True,
|
||
'modify_client_tkt_fn': self.rc4_pac_checksums,
|
||
'expect_edata': expect_edata,
|
||
})
|
||
|
||
def test_rbcd_rc4_client_checksum(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
# Present a user ticket with RC4 checksums.
|
||
expected_error_mode = (KDC_ERR_GENERIC,
|
||
KDC_ERR_BADOPTION)
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
'expected_error_mode': expected_error_mode,
|
||
# We aren’t particular about whether or not we get an NTSTATUS.
|
||
'expect_status': None,
|
||
'expected_status': ntstatus.NT_STATUS_NOT_SUPPORTED,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
'modify_client_tkt_fn': self.rc4_pac_checksums,
|
||
})
|
||
|
||
def test_constrained_delegation_rodc_issued(self):
|
||
self._run_delegation_test(
|
||
{
|
||
# Test that RODC-issued constrained delegation tickets are
|
||
# accepted.
|
||
'expected_error_mode': 0,
|
||
'allow_delegation': True,
|
||
# Both tickets must be signed by the same RODC.
|
||
'modify_client_tkt_fn': self.signed_by_rodc,
|
||
'modify_service_tgt_fn': self.issued_by_rodc,
|
||
'client_opts': {
|
||
'allowed_replication_mock': True,
|
||
'revealed_to_mock_rodc': True,
|
||
},
|
||
'service1_opts': {
|
||
'allowed_replication_mock': True,
|
||
'revealed_to_mock_rodc': True,
|
||
},
|
||
})
|
||
|
||
def test_rbcd_rodc_issued(self):
|
||
self.skip_unless_fl2008()
|
||
|
||
self._run_delegation_test(
|
||
{
|
||
# Test that RODC-issued constrained delegation tickets are
|
||
# accepted.
|
||
'expected_error_mode': 0,
|
||
'allow_rbcd': True,
|
||
'pac_options': '0001', # supports RBCD
|
||
# Both tickets must be signed by the same RODC.
|
||
'modify_client_tkt_fn': self.signed_by_rodc,
|
||
'modify_service_tgt_fn': self.issued_by_rodc,
|
||
'client_opts': {
|
||
'allowed_replication_mock': True,
|
||
'revealed_to_mock_rodc': True,
|
||
},
|
||
'service1_opts': {
|
||
'allowed_replication_mock': True,
|
||
'revealed_to_mock_rodc': True,
|
||
},
|
||
})
|
||
|
||
def remove_pac_checksum(self, ticket, checksum):
|
||
checksum_keys = self.get_krbtgt_checksum_key()
|
||
|
||
return self.modified_ticket(ticket,
|
||
checksum_keys=checksum_keys,
|
||
include_checksums={checksum: False})
|
||
|
||
def zeroed_pac_checksum(self, ticket, checksum):
|
||
krbtgt_creds = self.get_krbtgt_creds()
|
||
krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
|
||
|
||
server_key = ticket.decryption_key
|
||
|
||
checksum_keys = {
|
||
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
|
||
krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
|
||
}
|
||
|
||
if checksum == krb5pac.PAC_TYPE_SRV_CHECKSUM:
|
||
zeroed_key = server_key
|
||
else:
|
||
zeroed_key = krbtgt_key
|
||
|
||
checksum_keys[checksum] = ZeroedChecksumKey(zeroed_key.key,
|
||
zeroed_key.kvno)
|
||
|
||
return self.modified_ticket(ticket,
|
||
checksum_keys=checksum_keys,
|
||
include_checksums={checksum: True})
|
||
|
||
def unkeyed_pac_checksum(self, ticket, checksum, ctype):
|
||
krbtgt_creds = self.get_krbtgt_creds()
|
||
krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
|
||
|
||
server_key = ticket.decryption_key
|
||
|
||
checksum_keys = {
|
||
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
|
||
krb5pac.PAC_TYPE_TICKET_CHECKSUM: krbtgt_key,
|
||
krb5pac.PAC_TYPE_FULL_CHECKSUM: krbtgt_key,
|
||
}
|
||
|
||
# Make a copy of the existing key and change the ctype.
|
||
key = checksum_keys[checksum]
|
||
new_key = RodcPacEncryptionKey(key.key, key.kvno)
|
||
new_key.ctype = ctype
|
||
checksum_keys[checksum] = new_key
|
||
|
||
return self.modified_ticket(ticket,
|
||
checksum_keys=checksum_keys,
|
||
include_checksums={checksum: True})
|
||
|
||
def rc4_pac_checksums(self, ticket):
|
||
krbtgt_creds = self.get_krbtgt_creds()
|
||
rc4_krbtgt_key = self.TicketDecryptionKey_from_creds(
|
||
krbtgt_creds, etype=Enctype.RC4)
|
||
|
||
server_key = ticket.decryption_key
|
||
|
||
checksum_keys = {
|
||
krb5pac.PAC_TYPE_SRV_CHECKSUM: server_key,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM: rc4_krbtgt_key,
|
||
krb5pac.PAC_TYPE_TICKET_CHECKSUM: rc4_krbtgt_key,
|
||
krb5pac.PAC_TYPE_FULL_CHECKSUM: rc4_krbtgt_key,
|
||
}
|
||
|
||
include_checksums = {
|
||
krb5pac.PAC_TYPE_SRV_CHECKSUM: True,
|
||
krb5pac.PAC_TYPE_KDC_CHECKSUM: True,
|
||
krb5pac.PAC_TYPE_TICKET_CHECKSUM: True,
|
||
krb5pac.PAC_TYPE_FULL_CHECKSUM: True,
|
||
}
|
||
|
||
return self.modified_ticket(ticket,
|
||
checksum_keys=checksum_keys,
|
||
include_checksums=include_checksums)
|
||
|
||
def add_delegation_info(self, ticket, *, services):
|
||
def modify_pac_fn(pac):
|
||
pac_buffers = pac.buffers
|
||
self.assertNotIn(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION,
|
||
(buffer.type for buffer in pac_buffers))
|
||
|
||
transited_services = list(map(lsa.String, services))
|
||
|
||
delegation = krb5pac.PAC_CONSTRAINED_DELEGATION()
|
||
delegation.proxy_target = lsa.String('test_proxy_target')
|
||
delegation.transited_services = transited_services
|
||
delegation.num_transited_services = len(transited_services)
|
||
|
||
info = krb5pac.PAC_CONSTRAINED_DELEGATION_CTR()
|
||
info.info = delegation
|
||
|
||
pac_buffer = krb5pac.PAC_BUFFER()
|
||
pac_buffer.type = krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION
|
||
pac_buffer.info = info
|
||
|
||
pac_buffers.append(pac_buffer)
|
||
|
||
pac.buffers = pac_buffers
|
||
pac.num_buffers += 1
|
||
|
||
return pac
|
||
|
||
checksum_keys = self.get_krbtgt_checksum_key()
|
||
|
||
return self.modified_ticket(ticket,
|
||
checksum_keys=checksum_keys,
|
||
modify_pac_fn=modify_pac_fn)
|
||
|
||
def set_ticket_forwardable(self, ticket, flag, update_pac_checksums=True):
|
||
modify_fn = functools.partial(self.modify_ticket_flag,
|
||
flag='forwardable',
|
||
value=flag)
|
||
|
||
if update_pac_checksums:
|
||
checksum_keys = self.get_krbtgt_checksum_key()
|
||
else:
|
||
checksum_keys = None
|
||
|
||
return self.modified_ticket(ticket,
|
||
modify_fn=modify_fn,
|
||
checksum_keys=checksum_keys,
|
||
update_pac_checksums=update_pac_checksums)
|
||
|
||
def remove_ticket_pac(self, ticket):
|
||
return self.modified_ticket(ticket,
|
||
exclude_pac=True)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
global_asn1_print = False
|
||
global_hexdump = False
|
||
import unittest
|
||
unittest.main()
|