1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

tests/drs: extend getnc_exop test to check linked attributes

Assert that linked attributes propagate across DRS and come in a
particular sorted order.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11960
This commit is contained in:
Garming Sam 2016-06-08 11:10:58 +12:00 committed by Andrew Bartlett
parent d2ebe2d17d
commit ed6a423232

View File

@ -36,7 +36,46 @@ from ldb import SCOPE_BASE
from samba.dcerpc import drsuapi, misc, drsblobs from samba.dcerpc import drsuapi, misc, drsblobs
from samba.drs_utils import drs_DsBind from samba.drs_utils import drs_DsBind
from samba.ndr import ndr_unpack
def _linked_attribute_compare(la1, la2):
"""See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
la1, la1_target = la1
la2, la2_target = la2
# Ascending host object GUID
c = cmp(la1.identifier.guid, la2.identifier.guid)
if c != 0:
return c
# Ascending attribute ID
if la1.attid != la2.attid:
return -1 if la1.attid < la2.attid else 1
la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
# Ascending 'is present'
if la1_active != la2_active:
return 1 if la1_active else -1
# Ascending target object GUID
return cmp(la1_target, la2_target)
class AbstractLink:
def __init__(self, attid, flags, identifier, targetGUID):
self.attid = attid
self.flags = flags
self.identifier = identifier
self.targetGUID = targetGUID
def __eq__(self, other):
return isinstance(other, AbstractLink) and \
((self.attid, self.flags, self.identifier, self.targetGUID) ==
(other.attid, other.flags, other.identifier, other.targetGUID))
def __hash__(self):
return hash((self.attid, self.flags, self.identifier, self.targetGUID))
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase): class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a semi-black box test case for DsGetNCChanges """Intended as a semi-black box test case for DsGetNCChanges
@ -55,7 +94,7 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
replica_flags=0): replica_flags=0):
req8 = drsuapi.DsGetNCChangesRequest8() req8 = drsuapi.DsGetNCChangesRequest8()
req8.destination_dsa_guid = misc.GUID(dest_dsa) req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.source_dsa_invocation_id = misc.GUID(invocation_id) req8.source_dsa_invocation_id = misc.GUID(invocation_id)
req8.naming_context = drsuapi.DsReplicaObjectIdentifier() req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
req8.naming_context.dn = unicode(nc_dn_str) req8.naming_context.dn = unicode(nc_dn_str)
@ -243,3 +282,139 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
self.assertEqual(ctr6.drs_error[0], 0) self.assertEqual(ctr6.drs_error[0], 0)
# We don't check the linked_attributes_count as if the domain # We don't check the linked_attributes_count as if the domain
# has an RODC, it can gain links on the server account object # has an RODC, it can gain links on the server account object
def test_sort_behaviour_single_object(self):
"""Testing sorting behaviour on single objects"""
self.base_dn = self.ldb_dc1.get_default_basedn()
self.ou = "ou=sort_exop,%s" % self.base_dn
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})
user1_dn = "cn=test_user1,%s" % self.ou
user2_dn = "cn=test_user2,%s" % self.ou
user3_dn = "cn=test_user3,%s" % self.ou
group_dn = "cn=test_group,%s" % self.ou
self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
attrs=["objectGUID"])[0]['objectGUID'][0]))
u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
attrs=["objectGUID"])[0]['objectGUID'][0]))
u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
attrs=["objectGUID"])[0]['objectGUID'][0]))
g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
attrs=["objectGUID"])[0]['objectGUID'][0]))
self.add_linked_attribute(group_dn, user1_dn,
attr='member')
self.add_linked_attribute(group_dn, user2_dn,
attr='member')
self.add_linked_attribute(group_dn, user3_dn,
attr='member')
self.add_linked_attribute(group_dn, user1_dn,
attr='managedby')
self.add_linked_attribute(group_dn, user2_dn,
attr='nonSecurityMember')
self.add_linked_attribute(group_dn, user3_dn,
attr='nonSecurityMember')
set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid, u3_guid)
expected_links = set([set_inactive,
AbstractLink(drsuapi.DRSUAPI_ATTID_member,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid,
u1_guid),
AbstractLink(drsuapi.DRSUAPI_ATTID_member,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid,
u2_guid),
AbstractLink(drsuapi.DRSUAPI_ATTID_member,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid,
u3_guid),
AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid,
u1_guid),
AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
g_guid,
u2_guid),
])
dc_guid_1 = self.ldb_dc1.get_invocation_id()
drs, drs_handle = self._ds_bind(self.dnsname_dc1)
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=group_dn,
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
no_inactive = []
for link in ctr.linked_attributes:
target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
link.value.blob).guid
no_inactive.append((link, target_guid))
self.assertTrue(AbstractLink(link.attid, link.flags,
str(link.identifier.guid),
str(target_guid)) in expected_links)
no_inactive.sort(cmp=_linked_attribute_compare)
# assert the two arrays are the same
self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
self.assertEqual(len(expected_links), ctr.linked_attributes_count)
self.remove_linked_attribute(group_dn, user3_dn,
attr='nonSecurityMember')
# Set the link inactive
expected_links.remove(set_inactive)
set_inactive.flags = 0
expected_links.add(set_inactive)
has_inactive = []
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
for link in ctr.linked_attributes:
target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
link.value.blob).guid
has_inactive.append((link, target_guid))
self.assertTrue(AbstractLink(link.attid, link.flags,
str(link.identifier.guid),
str(target_guid)) in expected_links)
has_inactive.sort(cmp=_linked_attribute_compare)
# assert the two arrays are the same
self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
self.assertEqual(len(expected_links), ctr.linked_attributes_count)
# tidyup groups and users
try:
self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
except ldb.LdbError as (enum, string):
if enum == ldb.ERR_NO_SUCH_OBJECT:
pass
def add_linked_attribute(self, src, dest, attr='member'):
m = ldb.Message()
m.dn = ldb.Dn(self.ldb_dc1, src)
m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
self.ldb_dc1.modify(m)
def remove_linked_attribute(self, src, dest, attr='member'):
m = ldb.Message()
m.dn = ldb.Dn(self.ldb_dc1, src)
m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
self.ldb_dc1.modify(m)