1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-26 10:04:02 +03:00

msds_intid: Add test for schema linked attributes

This test only covers the forward link case.

NOTE: We can't confirm this against Windows because they prevent us from
modifying the schema for the schema classes.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Garming Sam 2016-07-26 16:57:24 +12:00 committed by Stefan Metzmacher
parent fa6411657f
commit 0555443213
2 changed files with 87 additions and 1 deletions

View File

@ -286,3 +286,4 @@
^samba4.krb5.kdc.*as-req-aes.*fl2000dc
# nt4_member and ad_member don't support ntlmv1
^samba3.blackbox.smbclient_auth.plain.*_member.*option=clientntlmv2auth=no.member.creds.*as.user
^samba4.drs.repl_schema.python.*repl_schema.DrsReplSchemaTestCase.test_classWithCustomLinkAttribute

View File

@ -41,7 +41,8 @@ from ldb import (
import ldb
import drs_base
from samba.dcerpc import drsuapi, misc
from samba.drs_utils import drs_DsBind
class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase):
@ -50,6 +51,38 @@ class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase):
# current Class or Attribute object id
obj_id = 0
def _ds_bind(self, server_name):
binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
(drs_handle, supported_extensions) = drs_DsBind(drs)
return (drs, drs_handle)
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
replica_flags=0, max_objects=0):
req8 = drsuapi.DsGetNCChangesRequest8()
req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.source_dsa_invocation_id = misc.GUID(invocation_id)
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
req8.naming_context.dn = unicode(nc_dn_str)
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
req8.highwatermark.tmp_highest_usn = 0
req8.highwatermark.reserved_usn = 0
req8.highwatermark.highest_usn = 0
req8.uptodateness_vector = None
req8.replica_flags = replica_flags
req8.max_object_count = max_objects
req8.max_ndr_size = 402116
req8.extended_op = exop
req8.fsmo_info = 0
req8.partial_attribute_set = None
req8.partial_attribute_set_ex = None
req8.mapping_ctr.num_mappings = 0
req8.mapping_ctr.mappings = None
return req8
def setUp(self):
super(DrsReplSchemaTestCase, self).setUp()
@ -202,6 +235,58 @@ class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase):
self._check_object(c_dn)
self._check_object(a_dn)
def test_classWithCustomLinkAttribute(self):
"""Create new Attribute and a Class,
that has value for newly created attribute.
This should check code path that searches for
AttributeID_id in Schema cache"""
# add new attributeSchema object
(a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-Link-X", 1,
attrs={'linkID':"99990",
"attributeSyntax": "2.5.5.1",
"omSyntax": "127"})
# add a base classSchema class so we can use our new
# attribute in class definition in a sibling class
(c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Y", 7,
1,
{"systemMayContain": a_ldn,
"subClassOf": "classSchema"})
# add new classSchema object with value for a_ldb attribute
(c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Z", 8,
1,
{"objectClass": ["top", "classSchema", c_ldn],
a_ldn: self.schema_dn})
# force replication from DC1 to DC2
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn, forced=True)
# check objects are replicated
self._check_object(c_dn)
self._check_object(a_dn)
res = self.ldb_dc1.search(base=a_dn,
scope=SCOPE_BASE,
attrs=["msDS-IntId"])
self.assertEqual(1, len(res))
self.assertTrue("msDS-IntId" in res[0])
int_id = int(res[0]["msDS-IntId"][0])
if int_id < 0:
int_id += (1 << 32)
dc_guid_1 = self.ldb_dc1.get_invocation_id()
drs, drs_handle = self._ds_bind(self.dnsname_dc1)
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=c_dn,
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
replica_flags=drsuapi.DRSUAPI_DRS_SYNC_FORCED)
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
for link in ctr.linked_attributes:
self.assertTrue(link.attid != int_id,
'Got %d for both' % link.attid)
def test_attribute(self):
"""Simple test for attributeSchema replication"""
# add new attributeSchema object