1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-20 22:50:26 +03:00

tests/krb5/kdc_base_test.py: Create database connection only when needed

Now the database connection is only created on its first use, which
means database credentials are no longer required for tests that don't
make use of it.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14817
(cherry picked from commit 4f5566be4839838e0e3e501a030bcf6e85ff5159)
This commit is contained in:
Joseph Sutton 2021-06-16 11:04:00 +12:00 committed by Jule Anger
parent c12cc69371
commit 9ce0d56ed4
7 changed files with 116 additions and 86 deletions

View File

@ -89,15 +89,7 @@ class KDCBaseTest(RawKerberosTest):
cls.credentials = c
cls.session = system_session()
cls.ldb = SamDB(url="ldap://%s" % cls.host,
session_info=cls.session,
credentials=cls.credentials,
lp=cls.lp)
# fetch the dnsHostName from the RootDse
res = cls.ldb.search(
base="", expression="", scope=SCOPE_BASE, attrs=["dnsHostName"])
cls.dns_host_name = str(res[0]['dnsHostName'])
cls._ldb = None
# A set containing DNs of accounts created as part of testing.
cls.accounts = set()
@ -107,8 +99,9 @@ class KDCBaseTest(RawKerberosTest):
# Clean up any accounts created by create_account. This is
# done in tearDownClass() rather than tearDown(), so that
# accounts need only be created once for permutation tests.
for dn in cls.accounts:
delete_force(cls.ldb, dn)
if cls._ldb is not None:
for dn in cls.accounts:
delete_force(cls._ldb, dn)
super().tearDownClass()
def setUp(self):
@ -116,16 +109,27 @@ class KDCBaseTest(RawKerberosTest):
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
def create_account(self, name, machine_account=False, spn=None, upn=None):
def get_samdb(self):
if self._ldb is None:
session = system_session()
type(self)._ldb = SamDB(url="ldap://%s" % self.host,
session_info=session,
credentials=self.credentials,
lp=self.lp)
return self._ldb
def create_account(self, ldb, name, machine_account=False,
spn=None, upn=None):
'''Create an account for testing.
The dn of the created account is added to self.accounts,
which is used by tearDownClass to clean up the created accounts.
'''
dn = "cn=%s,%s" % (name, self.ldb.domain_dn())
dn = "cn=%s,%s" % (name, ldb.domain_dn())
# remove the account if it exists, this will happen if a previous test
# run failed
delete_force(self.ldb, dn)
delete_force(ldb, dn)
if machine_account:
object_class = "computer"
account_name = "%s$" % name
@ -148,12 +152,12 @@ class KDCBaseTest(RawKerberosTest):
details["servicePrincipalName"] = spn
if upn is not None:
details["userPrincipalName"] = upn
self.ldb.add(details)
ldb.add(details)
creds = Credentials()
creds.guess(self.lp)
creds.set_realm(self.ldb.domain_dns_name().upper())
creds.set_domain(self.ldb.domain_netbios_name().upper())
creds.set_realm(ldb.domain_dns_name().upper())
creds.set_domain(ldb.domain_netbios_name().upper())
creds.set_password(password)
creds.set_username(account_name)
if machine_account:
@ -425,38 +429,38 @@ class KDCBaseTest(RawKerberosTest):
enc_part, asn1Spec=krb5_asn1.EncTicketPart())
return enc_ticket_part
def get_objectSid(self, dn):
def get_objectSid(self, samdb, dn):
''' Get the objectSID for a DN
Note: performs an Ldb query.
'''
res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
sid = self.ldb.schema_format_value("objectSID", res[0]["objectSID"][0])
sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
return sid.decode('utf8')
def add_attribute(self, dn_str, name, value):
def add_attribute(self, samdb, dn_str, name, value):
if isinstance(value, list):
values = value
else:
values = [value]
flag = ldb.FLAG_MOD_ADD
dn = ldb.Dn(self.ldb, dn_str)
dn = ldb.Dn(samdb, dn_str)
msg = ldb.Message(dn)
msg[name] = ldb.MessageElement(values, flag, name)
self.ldb.modify(msg)
samdb.modify(msg)
def modify_attribute(self, dn_str, name, value):
def modify_attribute(self, samdb, dn_str, name, value):
if isinstance(value, list):
values = value
else:
values = [value]
flag = ldb.FLAG_MOD_REPLACE
dn = ldb.Dn(self.ldb, dn_str)
dn = ldb.Dn(samdb, dn_str)
msg = ldb.Message(dn)
msg[name] = ldb.MessageElement(values, flag, name)
self.ldb.modify(msg)
samdb.modify(msg)
def create_ccache(self, cname, ticket, enc_part):
""" Lay out a version 4 on-disk credentials cache, to be read using the

View File

@ -49,8 +49,9 @@ class KdcTgsTests(KDCBaseTest):
that differs from that provided to the krbtgt
'''
# Create the user account
samdb = self.get_samdb()
user_name = "tsttktusr"
(uc, _) = self.create_account(user_name)
(uc, _) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
@ -81,7 +82,7 @@ class KdcTgsTests(KDCBaseTest):
names=["Administrator"])
sname = self.PrincipalName_create(
name_type=NT_PRINCIPAL,
names=["host", self.dns_host_name])
names=["host", samdb.host_dns_name()])
(rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype)
@ -98,8 +99,9 @@ class KdcTgsTests(KDCBaseTest):
'''Get a ticket to the ldap service
'''
# Create the user account
samdb = self.get_samdb()
user_name = "tsttktusr"
(uc, _) = self.create_account(user_name)
(uc, _) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
@ -126,7 +128,7 @@ class KdcTgsTests(KDCBaseTest):
# Request a ticket to the ldap service
sname = self.PrincipalName_create(
name_type=NT_SRV_INST,
names=["ldap", self.dns_host_name])
names=["ldap", samdb.host_dns_name()])
(rep, _) = self.tgs_req(
cname, sname, uc.get_realm(), ticket, key, etype)
@ -137,9 +139,10 @@ class KdcTgsTests(KDCBaseTest):
# Create a user and machine account for the test.
#
samdb = self.get_samdb()
user_name = "tsttktusr"
(uc, dn) = self.create_account(user_name)
(mc, _) = self.create_account("tsttktmac", machine_account=True)
(uc, dn) = self.create_account(samdb, user_name)
(mc, _) = self.create_account(samdb, "tsttktmac", machine_account=True)
realm = uc.get_realm().lower()
# Do the initial AS-REQ, should get a pre-authentication required
@ -179,7 +182,7 @@ class KdcTgsTests(KDCBaseTest):
enc_part = self.decode_service_ticket(mc, ticket)
pac_data = self.get_pac_data(enc_part['authorization-data'])
sid = self.get_objectSid(dn)
sid = self.get_objectSid(samdb, dn)
upn = "%s@%s" % (uc.get_username(), realm)
self.assertEqual(
uc.get_username(),

View File

@ -49,10 +49,10 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
self.do_asn1_print = global_asn1_print
self.do_hexdump = global_hexdump
def check_pac(self, auth_data, dn, uc, name, upn=None):
def check_pac(self, samdb, auth_data, dn, uc, name, upn=None):
pac_data = self.get_pac_data(auth_data)
sid = self.get_objectSid(dn)
sid = self.get_objectSid(samdb, dn)
if upn is None:
upn = "%s@%s" % (name, uc.get_realm().lower())
if name.endswith('$'):
@ -89,12 +89,13 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create user and machine accounts for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -131,7 +132,7 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Check the contents of the pac, and the ticket
ticket = rep['ticket']
enc_part = self.decode_service_ticket(mc, ticket)
self.check_pac(enc_part['authorization-data'], dn, uc, user_name)
self.check_pac(samdb, enc_part['authorization-data'], dn, uc, user_name)
# check the crealm and cname
cname = enc_part['cname']
self.assertEqual(NT_PRINCIPAL, cname['name-type'])
@ -147,12 +148,13 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create a machine account for the test.
#
samdb = self.get_samdb()
user_name = "mskilemac"
(mc, dn) = self.create_account(user_name, machine_account=True)
(mc, dn) = self.create_account(samdb, user_name, machine_account=True)
realm = mc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -189,7 +191,7 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Check the contents of the pac, and the ticket
ticket = rep['ticket']
enc_part = self.decode_service_ticket(mc, ticket)
self.check_pac(enc_part['authorization-data'], dn, mc, mach_name + '$')
self.check_pac(samdb, enc_part['authorization-data'], dn, mc, mach_name + '$')
# check the crealm and cname
cname = enc_part['cname']
self.assertEqual(NT_PRINCIPAL, cname['name-type'])
@ -206,14 +208,15 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
'''
# Create a user account for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
upn_name = "mskileupn"
upn = upn_name + "@" + self.credentials.get_realm().lower()
(uc, dn) = self.create_account(user_name, upn=upn)
(uc, dn) = self.create_account(samdb, user_name, upn=upn)
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -250,7 +253,7 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Check the contents of the service ticket
ticket = rep['ticket']
enc_part = self.decode_service_ticket(mc, ticket)
self.check_pac(enc_part['authorization-data'], dn, uc, upn_name)
self.check_pac(samdb, enc_part['authorization-data'], dn, uc, upn_name)
# check the crealm and cname
cname = enc_part['cname']
self.assertEqual(NT_PRINCIPAL, cname['name-type'])
@ -273,19 +276,21 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# setting UF_DONT_REQUIRE_PREAUTH seems to be the only way
# to trigger the no pre-auth step
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
self.modify_attribute(
samdb,
dn,
"userAccountControl",
str(UF_NORMAL_ACCOUNT | UF_DONT_REQUIRE_PREAUTH))
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
@ -340,15 +345,16 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create user and machine accounts for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -406,15 +412,16 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create user and machine accounts for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -445,14 +452,15 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create a user account for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
upn_name = "mskileupn"
upn = upn_name + "@" + self.credentials.get_realm().lower()
(uc, dn) = self.create_account(user_name, upn=upn)
(uc, dn) = self.create_account(samdb, user_name, upn=upn)
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -508,13 +516,14 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create a user account for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
ename = user_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -570,12 +579,13 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create a user account for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
(uc, _) = self.create_account(user_name)
(uc, _) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
mach_name = "mskilemac"
(mc, dn) = self.create_account(mach_name, machine_account=True)
(mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
ename = mach_name + "@" + realm
uname = mach_name + "$@" + realm
@ -638,20 +648,22 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# setting UF_DONT_REQUIRE_PREAUTH seems to be the only way
# to trigger the no pre-auth step
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
self.modify_attribute(
samdb,
dn,
"userAccountControl",
str(UF_NORMAL_ACCOUNT | UF_DONT_REQUIRE_PREAUTH))
ename = alt_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
# we should get a valid AS-RESP
@ -706,17 +718,18 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create user and machine accounts for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
ename = alt_name + "@" + realm
uname = user_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response
@ -775,16 +788,17 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
# Create user and machine accounts for the test.
#
samdb = self.get_samdb()
user_name = "mskileusr"
alt_name = "mskilealtsec"
(uc, dn) = self.create_account(user_name)
(uc, dn) = self.create_account(samdb, user_name)
realm = uc.get_realm().lower()
alt_sec = "Kerberos:%s@%s" % (alt_name, realm)
self.add_attribute(dn, "altSecurityIdentities", alt_sec)
self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
ename = alt_name + "@" + realm
mach_name = "mskilemac"
(mc, _) = self.create_account(mach_name, machine_account=True)
(mc, _) = self.create_account(samdb, mach_name, machine_account=True)
# Do the initial AS-REQ, should get a pre-authentication required
# response

View File

@ -49,11 +49,14 @@ class CcacheTests(KDCBaseTest):
mach_name = "ccachemac"
service = "host"
samdb = self.get_samdb()
# Create the user account.
(user_credentials, _) = self.create_account(user_name)
(user_credentials, _) = self.create_account(samdb, user_name)
# Create the machine account.
(mach_credentials, _) = self.create_account(mach_name,
(mach_credentials, _) = self.create_account(samdb,
mach_name,
machine_account=True,
spn="%s/%s" % (service,
mach_name))
@ -77,7 +80,7 @@ class CcacheTests(KDCBaseTest):
gensec_client.want_feature(gensec.FEATURE_SEAL)
gensec_client.start_mech_by_sasl_name("GSSAPI")
auth_context = AuthContext(lp_ctx=self.lp, ldb=self.ldb, methods=[])
auth_context = AuthContext(lp_ctx=self.lp, ldb=samdb, methods=[])
gensec_server = gensec.Security.start_server(settings, auth_context)
gensec_server.set_credentials(mach_credentials)
@ -104,9 +107,9 @@ class CcacheTests(KDCBaseTest):
# token is the SID of the user we created.
# Retrieve the user account's SID.
ldb_res = self.ldb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
ldb_res = samdb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
self.assertEqual(1, len(ldb_res))
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])

View File

@ -44,12 +44,14 @@ class LdapTests(KDCBaseTest):
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "ldapusr"
mach_name = self.dns_host_name
mach_name = samdb.host_dns_name()
service = "ldap"
# Create the user account.
(user_credentials, _) = self.create_account(user_name)
(user_credentials, _) = self.create_account(samdb, user_name)
# Talk to the KDC to obtain the service ticket, which gets placed into
# the cache. The machine account name has to match the name in the
@ -63,9 +65,9 @@ class LdapTests(KDCBaseTest):
# cached credentials.
# Retrieve the user account's SID.
ldb_res = self.ldb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
ldb_res = samdb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
self.assertEqual(1, len(ldb_res))
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])

View File

@ -41,12 +41,14 @@ class RpcTests(KDCBaseTest):
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "rpcusr"
mach_name = self.dns_host_name
mach_name = samdb.host_dns_name()
service = "cifs"
# Create the user account.
(user_credentials, _) = self.create_account(user_name)
(user_credentials, _) = self.create_account(samdb, user_name)
# Talk to the KDC to obtain the service ticket, which gets placed into
# the cache. The machine account name has to match the name in the

View File

@ -45,13 +45,15 @@ class SmbTests(KDCBaseTest):
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "smbusr"
mach_name = self.dns_host_name
mach_name = samdb.host_dns_name()
service = "cifs"
share = "tmp"
# Create the user account.
(user_credentials, _) = self.create_account(user_name)
(user_credentials, _) = self.create_account(samdb, user_name)
# Talk to the KDC to obtain the service ticket, which gets placed into
# the cache. The machine account name has to match the name in the
@ -72,9 +74,9 @@ class SmbTests(KDCBaseTest):
# cached credentials.
# Retrieve the user account's SID.
ldb_res = self.ldb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
ldb_res = samdb.search(scope=SCOPE_SUBTREE,
expression="(sAMAccountName=%s)" % user_name,
attrs=["objectSid"])
self.assertEqual(1, len(ldb_res))
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])