1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-28 01:58:17 +03:00

tests/krb5: Move some functions round to prepare for splitting the class

No effective code change.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Joseph Sutton 2023-08-11 09:37:32 +12:00 committed by Andrew Bartlett
parent 942cc0b626
commit 55c09c91ea

View File

@ -178,11 +178,6 @@ class AuthnPolicyTests(AuthLogTestBase, KdcTgsBaseTests):
cls._max_ticket_life = None
cls._max_renew_life = None
def setUp(self):
super().setUp()
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
def take(self, n, iterable, *, take_all=True):
"""Yield n items from an iterable."""
i = -1
@ -959,6 +954,175 @@ class AuthnPolicyTests(AuthLogTestBase, KdcTgsBaseTests):
audit_event=server_policy_event,
reason=server_policy_reason)
def check_ticket_times(self,
ticket_creds,
expected_life=None,
expected_renew_life=None):
ticket = ticket_creds.ticket_private
authtime = ticket['authtime']
starttime = ticket.get('starttime', authtime)
endtime = ticket['endtime']
renew_till = ticket.get('renew-till', None)
starttime = self.get_EpochFromKerberosTime(starttime)
if expected_life is not None:
actual_end = self.get_EpochFromKerberosTime(
endtime.decode('ascii'))
actual_lifetime = actual_end - starttime
self.assertEqual(expected_life, actual_lifetime)
if renew_till is None:
self.assertIsNone(expected_renew_life)
else:
if expected_renew_life is not None:
actual_renew_till = self.get_EpochFromKerberosTime(
renew_till.decode('ascii'))
actual_renew_life = actual_renew_till - starttime
self.assertEqual(expected_renew_life, actual_renew_life)
def _get_tgt(self, creds, *,
armor_tgt=None,
till=None,
expected_error=0,
expect_status=None,
expected_status=None):
user_name = creds.get_username()
realm = creds.get_realm()
salt = creds.get_salt()
cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
names=user_name.split('/'))
sname = self.PrincipalName_create(name_type=NT_SRV_INST,
names=['krbtgt', realm])
expected_sname = self.PrincipalName_create(
name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
expected_cname = cname
if till is None:
till = self.get_KerberosTime(offset=36000)
renew_time = till
krbtgt_creds = self.get_krbtgt_creds()
ticket_decryption_key = (
self.TicketDecryptionKey_from_creds(krbtgt_creds))
expected_etypes = krbtgt_creds.tgs_supported_enctypes
kdc_options = str(krb5_asn1.KDCOptions('renewable'))
# Contrary to Microsofts documentation, the returned ticket is
# renewable.
expected_flags = krb5_asn1.TicketFlags('renewable')
preauth_key = self.PasswordKey_from_creds(creds,
kcrypto.Enctype.AES256)
expected_realm = realm.upper()
etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
if armor_tgt is not None:
authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
armor_key = self.generate_armor_key(authenticator_subkey,
armor_tgt.session_key)
armor_subkey = authenticator_subkey
client_challenge_key = self.generate_client_challenge_key(
armor_key, preauth_key)
enc_challenge_padata = self.get_challenge_pa_data(
client_challenge_key)
def generate_fast_padata_fn(kdc_exchange_dict,
_callback_dict,
req_body):
return [enc_challenge_padata], req_body
generate_fast_fn = self.generate_simple_fast
generate_fast_armor_fn = self.generate_ap_req
generate_padata_fn = None
fast_armor_type = FX_FAST_ARMOR_AP_REQUEST
else:
ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(
preauth_key)
def generate_padata_fn(kdc_exchange_dict,
_callback_dict,
req_body):
return [ts_enc_padata], req_body
generate_fast_fn = None
generate_fast_padata_fn = None
generate_fast_armor_fn = None
armor_key = None
armor_subkey = None
fast_armor_type = None
if not expected_error:
check_error_fn = None
check_rep_fn = self.generic_check_kdc_rep
else:
check_error_fn = self.generic_check_kdc_error
check_rep_fn = None
kdc_exchange_dict = self.as_exchange_dict(
creds=creds,
expected_error_mode=expected_error,
expect_status=expect_status,
expected_status=expected_status,
expected_crealm=expected_realm,
expected_cname=expected_cname,
expected_srealm=expected_realm,
expected_sname=expected_sname,
expected_salt=salt,
expected_flags=expected_flags,
expected_supported_etypes=expected_etypes,
generate_padata_fn=generate_padata_fn,
generate_fast_padata_fn=generate_fast_padata_fn,
generate_fast_fn=generate_fast_fn,
generate_fast_armor_fn=generate_fast_armor_fn,
fast_armor_type=fast_armor_type,
check_error_fn=check_error_fn,
check_rep_fn=check_rep_fn,
check_kdc_private_fn=self.generic_check_kdc_private,
armor_key=armor_key,
armor_tgt=armor_tgt,
armor_subkey=armor_subkey,
kdc_options=kdc_options,
preauth_key=preauth_key,
ticket_decryption_key=ticket_decryption_key,
# PA-DATA types are not important for these tests.
check_patypes=False)
rep = self._generic_kdc_exchange(kdc_exchange_dict,
cname=cname,
realm=realm,
sname=sname,
till_time=till,
renew_time=renew_time,
etypes=etypes)
if expected_error:
self.check_error_rep(rep, expected_error)
return None
self.check_as_reply(rep)
ticket_creds = kdc_exchange_dict['rep_ticket_creds']
return ticket_creds
def setUp(self):
super().setUp()
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
def test_authn_policy_tgt_lifetime_user(self):
# Create an authentication policy with certain TGT lifetimes set.
user_life = 111
@ -8225,170 +8389,6 @@ class AuthnPolicyTests(AuthLogTestBase, KdcTgsBaseTests):
client_policy_status=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION,
event=AuditEvent.NTLM_DEVICE_RESTRICTION)
def check_ticket_times(self,
ticket_creds,
expected_life=None,
expected_renew_life=None):
ticket = ticket_creds.ticket_private
authtime = ticket['authtime']
starttime = ticket.get('starttime', authtime)
endtime = ticket['endtime']
renew_till = ticket.get('renew-till', None)
starttime = self.get_EpochFromKerberosTime(starttime)
if expected_life is not None:
actual_end = self.get_EpochFromKerberosTime(
endtime.decode('ascii'))
actual_lifetime = actual_end - starttime
self.assertEqual(expected_life, actual_lifetime)
if renew_till is None:
self.assertIsNone(expected_renew_life)
else:
if expected_renew_life is not None:
actual_renew_till = self.get_EpochFromKerberosTime(
renew_till.decode('ascii'))
actual_renew_life = actual_renew_till - starttime
self.assertEqual(expected_renew_life, actual_renew_life)
def _get_tgt(self, creds, *,
armor_tgt=None,
till=None,
expected_error=0,
expect_status=None,
expected_status=None):
user_name = creds.get_username()
realm = creds.get_realm()
salt = creds.get_salt()
cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
names=user_name.split('/'))
sname = self.PrincipalName_create(name_type=NT_SRV_INST,
names=['krbtgt', realm])
expected_sname = self.PrincipalName_create(
name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
expected_cname = cname
if till is None:
till = self.get_KerberosTime(offset=36000)
renew_time = till
krbtgt_creds = self.get_krbtgt_creds()
ticket_decryption_key = (
self.TicketDecryptionKey_from_creds(krbtgt_creds))
expected_etypes = krbtgt_creds.tgs_supported_enctypes
kdc_options = str(krb5_asn1.KDCOptions('renewable'))
# Contrary to Microsofts documentation, the returned ticket is
# renewable.
expected_flags = krb5_asn1.TicketFlags('renewable')
preauth_key = self.PasswordKey_from_creds(creds,
kcrypto.Enctype.AES256)
expected_realm = realm.upper()
etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
if armor_tgt is not None:
authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
armor_key = self.generate_armor_key(authenticator_subkey,
armor_tgt.session_key)
armor_subkey = authenticator_subkey
client_challenge_key = self.generate_client_challenge_key(
armor_key, preauth_key)
enc_challenge_padata = self.get_challenge_pa_data(
client_challenge_key)
def generate_fast_padata_fn(kdc_exchange_dict,
_callback_dict,
req_body):
return [enc_challenge_padata], req_body
generate_fast_fn = self.generate_simple_fast
generate_fast_armor_fn = self.generate_ap_req
generate_padata_fn = None
fast_armor_type = FX_FAST_ARMOR_AP_REQUEST
else:
ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(
preauth_key)
def generate_padata_fn(kdc_exchange_dict,
_callback_dict,
req_body):
return [ts_enc_padata], req_body
generate_fast_fn = None
generate_fast_padata_fn = None
generate_fast_armor_fn = None
armor_key = None
armor_subkey = None
fast_armor_type = None
if not expected_error:
check_error_fn = None
check_rep_fn = self.generic_check_kdc_rep
else:
check_error_fn = self.generic_check_kdc_error
check_rep_fn = None
kdc_exchange_dict = self.as_exchange_dict(
creds=creds,
expected_error_mode=expected_error,
expect_status=expect_status,
expected_status=expected_status,
expected_crealm=expected_realm,
expected_cname=expected_cname,
expected_srealm=expected_realm,
expected_sname=expected_sname,
expected_salt=salt,
expected_flags=expected_flags,
expected_supported_etypes=expected_etypes,
generate_padata_fn=generate_padata_fn,
generate_fast_padata_fn=generate_fast_padata_fn,
generate_fast_fn=generate_fast_fn,
generate_fast_armor_fn=generate_fast_armor_fn,
fast_armor_type=fast_armor_type,
check_error_fn=check_error_fn,
check_rep_fn=check_rep_fn,
check_kdc_private_fn=self.generic_check_kdc_private,
armor_key=armor_key,
armor_tgt=armor_tgt,
armor_subkey=armor_subkey,
kdc_options=kdc_options,
preauth_key=preauth_key,
ticket_decryption_key=ticket_decryption_key,
# PA-DATA types are not important for these tests.
check_patypes=False)
rep = self._generic_kdc_exchange(kdc_exchange_dict,
cname=cname,
realm=realm,
sname=sname,
till_time=till,
renew_time=renew_time,
etypes=etypes)
if expected_error:
self.check_error_rep(rep, expected_error)
return None
self.check_as_reply(rep)
ticket_creds = kdc_exchange_dict['rep_ticket_creds']
return ticket_creds
if __name__ == '__main__':
global_asn1_print = False