1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-26 10:04:02 +03:00

s4-selftest/drs Add test of expected return code for invaid DNs in GetNCChanges

BUG: https://bugzilla.samba.org/show_bug.cgi?id=10635

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Andrew Bartlett 2022-12-02 10:07:53 +13:00 committed by Stefan Metzmacher
parent 2c7bb58703
commit bee45e6b29
3 changed files with 88 additions and 3 deletions

View File

@ -4,3 +4,8 @@ samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegri
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_chain\(promoted_dc\)
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_and_anc\(promoted_dc\)
samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegrityTestCase.test_repl_get_tgt_multivalued_links\(promoted_dc\)
# New tests for GetNCChanges with a GUID and a bad DN, like Azure AD Cloud Sync
^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidDestDSA_and_GUID
^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID
^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ
^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC

View File

@ -432,13 +432,15 @@ class DrsBaseTestCase(SambaToolCmdTest):
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
replica_flags=0, max_objects=0, partial_attribute_set=None,
partial_attribute_set_ex=None, mapping_ctr=None):
partial_attribute_set_ex=None, mapping_ctr=None, nc_guid=None):
req8 = drsuapi.DsGetNCChangesRequest8()
req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.source_dsa_invocation_id = misc.GUID(invocation_id)
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
req8.naming_context.dn = str(nc_dn_str)
if nc_guid is not None:
req8.naming_context.guid = nc_guid
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
req8.highwatermark.tmp_highest_usn = 0
req8.highwatermark.reserved_usn = 0

View File

@ -240,6 +240,60 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
def test_InvalidNC_DummyDN_InvalidGUID(self):
"""Test full replication on a totally invalid GUID fails with the right error code"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
invocation_id=fsmo_owner["invocation_id"],
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
exop=drsuapi.DRSUAPI_EXOP_NONE)
(drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
"""Test single object replication on a totally invalid GUID fails with the right error code"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
invocation_id=fsmo_owner["invocation_id"],
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
(drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN)
def test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC(self):
"""Test RID Allocation on a totally invalid GUID fails with the right error code"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
invocation_id=fsmo_owner["invocation_id"],
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
(drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
def test_link_utdv_hwm(self):
"""Test verify the DRS_GET_ANC behavior."""
@ -597,12 +651,35 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
def test_InvalidDestDSA_and_GUID(self):
"""Test role transfer with invalid destination DSA guid"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
invocation_id=fsmo_owner["invocation_id"],
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
(drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
self.fail("DsGetNCChanges failed with {estr}")
self.assertEqual(level, 6, "Expected level 6 response!")
self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaPrefixMapTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
self.ou = "ou=pfm_exop,%s" % self.base_dn
self.ou = "ou=pfm_exop%d,%s" % (random.randint(0, 4294967295),
self.base_dn)
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})
@ -948,7 +1025,8 @@ class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncSortTestCase, self).setUp()
self.base_dn = self.ldb_dc1.get_default_basedn()
self.ou = "ou=sort_exop,%s" % self.base_dn
self.ou = "ou=sort_exop%d,%s" % (random.randint(0, 4294967295),
self.base_dn)
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})