1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

selftest: krb5 account creation: clarify account type as an enum

This makes the code clearer with a symbolic constant rather
than a True/False boolean.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14869

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Joseph Sutton 2021-10-08 15:40:09 +13:00 committed by Stefan Metzmacher
parent aacb18f920
commit 49306f74eb
7 changed files with 100 additions and 63 deletions

View File

@ -171,9 +171,10 @@ class KerberosASCanonicalizationTests(KDCBaseTest):
def machine_account_creds(self):
if self.machine_creds is None:
samdb = self.get_samdb()
self.machine_creds, _ = self.create_account(samdb,
MACHINE_NAME,
machine_account=True)
self.machine_creds, _ = self.create_account(
samdb,
MACHINE_NAME,
account_type=self.AccountType.COMPUTER)
self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)

View File

@ -23,6 +23,7 @@ import tempfile
import binascii
import collections
import secrets
from enum import Enum, auto
from collections import namedtuple
import ldb
@ -90,6 +91,10 @@ class KDCBaseTest(RawKerberosTest):
""" Base class for KDC tests.
"""
class AccountType(Enum):
USER = auto()
COMPUTER = auto()
@classmethod
def setUpClass(cls):
super().setUpClass()
@ -230,7 +235,7 @@ class KDCBaseTest(RawKerberosTest):
return default_enctypes
def create_account(self, samdb, name, machine_account=False,
def create_account(self, samdb, name, account_type=AccountType.USER,
spn=None, upn=None, additional_details=None,
ou=None, account_control=0):
'''Create an account for testing.
@ -238,8 +243,10 @@ class KDCBaseTest(RawKerberosTest):
which is used by tearDownClass to clean up the created accounts.
'''
if ou is None:
guid = (DS_GUID_COMPUTERS_CONTAINER if machine_account
else DS_GUID_USERS_CONTAINER)
if account_type is account_type.COMPUTER:
guid = DS_GUID_COMPUTERS_CONTAINER
else:
guid = DS_GUID_USERS_CONTAINER
ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
@ -248,14 +255,17 @@ class KDCBaseTest(RawKerberosTest):
# remove the account if it exists, this will happen if a previous test
# run failed
delete_force(samdb, dn)
if machine_account:
object_class = "computer"
account_name = "%s$" % name
account_control |= UF_WORKSTATION_TRUST_ACCOUNT
else:
if account_type is self.AccountType.USER:
object_class = "user"
account_name = name
account_control |= UF_NORMAL_ACCOUNT
else:
object_class = "computer"
account_name = "%s$" % name
if account_type is self.AccountType.COMPUTER:
account_control |= UF_WORKSTATION_TRUST_ACCOUNT
else:
self.fail()
password = generate_random_password(32, 32)
utf16pw = ('"%s"' % password).encode('utf-16-le')
@ -267,6 +277,10 @@ class KDCBaseTest(RawKerberosTest):
"userAccountControl": str(account_control),
"unicodePwd": utf16pw}
if spn is not None:
if isinstance(spn, str):
spn = spn.format(account=account_name)
else:
spn = tuple(s.format(account=account_name) for s in spn)
details["servicePrincipalName"] = spn
if upn is not None:
details["userPrincipalName"] = upn
@ -280,10 +294,10 @@ class KDCBaseTest(RawKerberosTest):
creds.set_domain(samdb.domain_netbios_name().upper())
creds.set_password(password)
creds.set_username(account_name)
if machine_account:
creds.set_workstation(name)
else:
if account_type is self.AccountType.USER:
creds.set_workstation('')
else:
creds.set_workstation(name)
creds.set_dn(ldb.Dn(samdb, dn))
creds.set_spn(spn)
#
@ -609,13 +623,14 @@ class KDCBaseTest(RawKerberosTest):
return cleanup
def get_cached_creds(self, *,
machine_account,
account_type,
opts=None,
use_cache=True):
if opts is None:
opts = {}
opts_default = {
'spn': None,
'allowed_replication': False,
'allowed_replication_mock': False,
'denied_replication': False,
@ -632,7 +647,7 @@ class KDCBaseTest(RawKerberosTest):
}
account_opts = {
'machine_account': machine_account,
'account_type': account_type,
**opts_default,
**opts
}
@ -651,7 +666,8 @@ class KDCBaseTest(RawKerberosTest):
return creds
def create_account_opts(self, *,
machine_account,
account_type,
spn,
allowed_replication,
allowed_replication_mock,
denied_replication,
@ -665,12 +681,13 @@ class KDCBaseTest(RawKerberosTest):
delegation_from_dn,
trusted_to_auth_for_delegation,
fast_support):
if machine_account:
self.assertFalse(not_delegated)
else:
if account_type is self.AccountType.USER:
self.assertIsNone(spn)
self.assertIsNone(delegation_to_spn)
self.assertIsNone(delegation_from_dn)
self.assertFalse(trusted_to_auth_for_delegation)
else:
self.assertFalse(not_delegated)
samdb = self.get_samdb()
rodc_samdb = self.get_rodc_samdb()
@ -707,13 +724,11 @@ class KDCBaseTest(RawKerberosTest):
details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
security_descriptor)
if machine_account:
if spn is None and account_type is not self.AccountType.USER:
spn = 'host/' + user_name
else:
spn = None
creds, dn = self.create_account(samdb, user_name,
machine_account=machine_account,
account_type=account_type,
spn=spn,
additional_details=details,
account_control=user_account_control)
@ -787,7 +802,7 @@ class KDCBaseTest(RawKerberosTest):
allow_missing_password=False,
allow_missing_keys=True):
def create_client_account():
return self.get_cached_creds(machine_account=False)
return self.get_cached_creds(account_type=self.AccountType.USER)
c = self._get_krb5_creds(prefix='CLIENT',
allow_missing_password=allow_missing_password,
@ -799,7 +814,7 @@ class KDCBaseTest(RawKerberosTest):
allow_missing_password=False,
allow_missing_keys=True):
def create_mach_account():
return self.get_cached_creds(machine_account=True,
return self.get_cached_creds(account_type=self.AccountType.COMPUTER,
opts={'fast_support': True})
c = self._get_krb5_creds(prefix='MAC',
@ -813,7 +828,7 @@ class KDCBaseTest(RawKerberosTest):
allow_missing_keys=True):
def create_service_account():
return self.get_cached_creds(
machine_account=True,
account_type=self.AccountType.COMPUTER,
opts={
'trusted_to_auth_for_delegation': True,
'fast_support': True

View File

@ -148,7 +148,8 @@ class KdcTgsTests(KDCBaseTest):
samdb = self.get_samdb()
user_name = "tsttktusr"
(uc, dn) = self.create_account(samdb, user_name)
(mc, _) = self.create_account(samdb, "tsttktmac", machine_account=True)
(mc, _) = self.create_account(samdb, "tsttktmac",
account_type=self.AccountType.COMPUTER)
realm = uc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
@ -282,7 +283,7 @@ class KdcTgsTests(KDCBaseTest):
def test_client_no_auth_data_required(self):
client_creds = self.get_cached_creds(
machine_account=False,
account_type=self.AccountType.USER,
opts={'no_auth_data_required': True})
service_creds = self.get_service_creds()
@ -299,7 +300,7 @@ class KdcTgsTests(KDCBaseTest):
def test_service_no_auth_data_required(self):
client_creds = self.get_client_creds()
service_creds = self.get_cached_creds(
machine_account=True,
account_type=self.AccountType.COMPUTER,
opts={'no_auth_data_required': True})
tgt = self.get_tgt(client_creds)

View File

@ -95,7 +95,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -151,7 +152,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
#
samdb = self.get_samdb()
mach_name = "mskilemac"
(mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
(mc, dn) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
realm = mc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
@ -215,7 +217,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -286,7 +289,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
@ -351,7 +355,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -420,7 +425,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -459,7 +465,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -523,7 +530,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
ename = user_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -586,7 +594,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
(mc, dn) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
ename = mach_name + "@" + realm
uname = mach_name + "$@" + realm
@ -661,7 +670,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
ename = alt_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
@ -728,7 +738,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
uname = user_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -798,7 +809,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
ename = alt_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name,
account_type=self.AccountType.COMPUTER)
# Do the initial AS-REQ, should get a pre-authentication required
# response

View File

@ -39,12 +39,12 @@ class RodcKerberosTests(KDCBaseTest):
# and including the RODCIdentifier.
def test_rodc_ticket_signature(self):
user_creds = self.get_cached_creds(
machine_account=False,
account_type=self.AccountType.USER,
opts={
'revealed_to_rodc': True
})
target_creds = self.get_cached_creds(
machine_account=True,
account_type=self.AccountType.COMPUTER,
opts={
'revealed_to_rodc': True
})

View File

@ -220,12 +220,14 @@ class S4UKerberosTests(KDCBaseTest):
def _run_s4u2self_test(self, kdc_dict):
client_opts = kdc_dict.pop('client_opts', None)
client_creds = self.get_cached_creds(machine_account=False,
opts=client_opts)
client_creds = self.get_cached_creds(
account_type=self.AccountType.USER,
opts=client_opts)
service_opts = kdc_dict.pop('service_opts', None)
service_creds = self.get_cached_creds(machine_account=True,
opts=service_opts)
service_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service_opts)
service_tgt = self.get_tgt(service_creds)
modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
@ -432,8 +434,9 @@ class S4UKerberosTests(KDCBaseTest):
def _run_delegation_test(self, kdc_dict):
client_opts = kdc_dict.pop('client_opts', None)
client_creds = self.get_cached_creds(machine_account=False,
opts=client_opts)
client_creds = self.get_cached_creds(
account_type=self.AccountType.USER,
opts=client_opts)
service1_opts = kdc_dict.pop('service1_opts', {})
service2_opts = kdc_dict.pop('service2_opts', {})
@ -443,24 +446,28 @@ class S4UKerberosTests(KDCBaseTest):
self.assertFalse(allow_delegation and allow_rbcd)
if allow_rbcd:
service1_creds = self.get_cached_creds(machine_account=True,
opts=service1_opts)
service1_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service1_opts)
self.assertNotIn('delegation_from_dn', service2_opts)
service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
service2_creds = self.get_cached_creds(machine_account=True,
opts=service2_opts)
service2_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service2_opts)
else:
service2_creds = self.get_cached_creds(machine_account=True,
opts=service2_opts)
service2_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service2_opts)
if allow_delegation:
self.assertNotIn('delegation_to_spn', service1_opts)
service1_opts['delegation_to_spn'] = service2_creds.get_spn()
service1_creds = self.get_cached_creds(machine_account=True,
opts=service1_opts)
service1_creds = self.get_cached_creds(
account_type=self.AccountType.COMPUTER,
opts=service1_opts)
client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
expected_flags = krb5_asn1.TicketFlags(client_tkt_options)

View File

@ -55,11 +55,12 @@ class CcacheTests(KDCBaseTest):
(user_credentials, _) = self.create_account(samdb, user_name)
# Create the machine account.
(mach_credentials, _) = self.create_account(samdb,
mach_name,
machine_account=True,
spn="%s/%s" % (service,
mach_name))
(mach_credentials, _) = self.create_account(
samdb,
mach_name,
account_type=self.AccountType.COMPUTER,
spn="%s/%s" % (service,
mach_name))
# Talk to the KDC to obtain the service ticket, which gets placed into
# the cache. The machine account name has to match the name in the