1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-24 13:57:43 +03:00

python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>

Autobuild-User(master): Ralph Böhme <slow@samba.org>
Autobuild-Date(master): Mon Feb 24 10:28:02 UTC 2025 on atb-devel-224
This commit is contained in:
Stefan Metzmacher 2025-02-05 09:15:47 +01:00 committed by Ralph Boehme
parent b1348ad288
commit 2dba2a31c2

View File

@ -103,6 +103,10 @@ from samba.join import DCJoinContext
from samba.ndr import ndr_pack, ndr_unpack
from samba.param import LoadParm
from samba.samdb import SamDB, dsdb_Dn
from samba.security import (
claims_tf_policy_parse_rules,
claims_tf_policy_wrap_xml,
)
rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5
aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
@ -206,6 +210,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
cls.ldb_cleanups = []
cls._claim_types_dn = None
cls._claim_tf_policies_dn = None
cls._authn_policy_config_dn = None
cls._authn_policies_dn = None
cls._authn_silos_dn = None
@ -250,6 +255,46 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
# Return a copy of the DN.
return ldb.Dn(samdb, str(self._claim_types_dn))
def get_claim_tf_policies_dn(self):
samdb = self.get_samdb()
if self._claim_tf_policies_dn is None:
claim_config_dn = samdb.get_config_basedn()
claim_config_dn.add_child('CN=Claims Configuration,CN=Services')
details = {
'dn': claim_config_dn,
'objectClass': 'container',
}
try:
samdb.add(details)
except ldb.LdbError as err:
num, _ = err.args
if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
raise
else:
self.accounts.append(str(claim_config_dn))
claim_tf_policies_dn = claim_config_dn
claim_tf_policies_dn.add_child('CN=Claims Transformation Policies')
details = {
'dn': claim_tf_policies_dn,
'objectClass': 'msDS-ClaimsTransformationPolicies',
}
try:
samdb.add(details)
except ldb.LdbError as err:
num, _ = err.args
if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
raise
else:
self.accounts.append(str(claim_tf_policies_dn))
type(self)._claim_tf_policies_dn = claim_tf_policies_dn
# Return a copy of the DN.
return ldb.Dn(samdb, str(self._claim_tf_policies_dn))
def get_authn_policy_config_dn(self):
samdb = self.get_samdb()
@ -912,6 +957,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
forest_info=None,
trust_incoming_password=None,
trust_outgoing_password=None,
ingress_claims_tf_rules=None,
egress_claims_tf_rules=None,
expect_error=None,
preserve=True):
"""Create an trust account for testing.
@ -1011,6 +1058,42 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
samdb = self.get_samdb()
def claims_tf_policy_dn(name, rules):
xml_rules = claims_tf_policy_wrap_xml(rules)
rs = claims_tf_policy_parse_rules(xml_rules, strip_xml=True)
policy_dn = self.get_claim_tf_policies_dn()
policy_dn.add_child('CN=%s' % name)
details = {
'dn': policy_dn,
'objectClass': 'msDS-ClaimsTransformationPolicyType',
'msDS-TransformationRules': xml_rules,
}
try:
samdb.add(details)
except ldb.LdbError as err:
num, _ = err.args
if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
raise
raise
else:
self.accounts.append(str(policy_dn))
return policy_dn
if ingress_claims_tf_rules is not None:
ingress_policy_dn = claims_tf_policy_dn("%s-Ingress" %
trust_dns_name.string,
ingress_claims_tf_rules)
else:
ingress_policy_dn = None
if egress_claims_tf_rules is not None:
egress_policy_dn = claims_tf_policy_dn("%s-Egress" %
trust_dns_name.string,
egress_claims_tf_rules)
else:
egress_policy_dn = None
incoming_account_name = trust_info.netbios_name.string
incoming_account_name += '$'
incoming_nbt_domain = local_info.name.string
@ -1029,6 +1112,22 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
self.assertEqual(len(tdo_res), 1)
tdo_dn = tdo_res[0].dn
if ingress_policy_dn or egress_policy_dn:
msg = ldb.Message()
msg.dn = tdo_dn
if ingress_policy_dn:
msg['ingress'] = ldb.MessageElement(
str(ingress_policy_dn),
ldb.FLAG_MOD_REPLACE,
'msDS-IngressClaimsTransformationPolicy')
if egress_policy_dn:
msg['egress'] = ldb.MessageElement(
str(egress_policy_dn),
ldb.FLAG_MOD_REPLACE,
'msDS-EgressClaimsTransformationPolicy')
samdb.modify(msg)
acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % (
incoming_account_name)
acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,