mirror of
https://github.com/samba-team/samba.git
synced 2025-01-25 06:04:04 +03:00
python/tests: Add repl_rodc test
Currently, this tests the msDS-RevealedUsers feature, which we don't support at the moment. Signed-off-by: Bob Campbell <bobcampbell@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Pair-programmed-with: Garming Sam <garming@catalyst.net.nz>
This commit is contained in:
parent
b0d37f6ca1
commit
325f8e88c5
@ -315,3 +315,5 @@
|
|||||||
^samba3.smb2.credits.session_setup_credits_granted.*
|
^samba3.smb2.credits.session_setup_credits_granted.*
|
||||||
^samba3.smb2.credits.single_req_credits_granted.*
|
^samba3.smb2.credits.single_req_credits_granted.*
|
||||||
^samba3.smb2.credits.skipped_mid.*
|
^samba3.smb2.credits.skipped_mid.*
|
||||||
|
# We don't yet support msDS-RevealedUsers
|
||||||
|
^samba4.drs.repl_rodc.python.*repl_rodc.*
|
||||||
|
@ -720,6 +720,13 @@ for env in ['vampire_dc', 'promoted_dc', 'vampire_2000_dc']:
|
|||||||
environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
|
environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
|
||||||
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
|
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
|
||||||
|
|
||||||
|
for env in ['ad_dc_ntvfs']:
|
||||||
|
planoldpythontestsuite(env, "repl_rodc",
|
||||||
|
extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
|
||||||
|
name="samba4.drs.repl_rodc.python(%s)" % env,
|
||||||
|
environ={'DC1': "$DC_SERVER", 'DC2': '$DC_SERVER'},
|
||||||
|
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
|
||||||
|
|
||||||
planoldpythontestsuite("chgdcpass:local", "samba.tests.blackbox.samba_dnsupdate",
|
planoldpythontestsuite("chgdcpass:local", "samba.tests.blackbox.samba_dnsupdate",
|
||||||
environ={'DNS_SERVER_IP': '$SERVER_IP'})
|
environ={'DNS_SERVER_IP': '$SERVER_IP'})
|
||||||
|
|
||||||
|
416
source4/torture/drs/python/repl_rodc.py
Normal file
416
source4/torture/drs/python/repl_rodc.py
Normal file
@ -0,0 +1,416 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Test replication scenarios involving an RODC
|
||||||
|
#
|
||||||
|
# Copyright (C) Catalyst.Net Ltd. 2017
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
#
|
||||||
|
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# export DC1=dc1_dns_name
|
||||||
|
# export DC2=dc1_dns_name [this is unused for the test, but it'll still try to connect]
|
||||||
|
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
|
||||||
|
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_rodc -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
|
||||||
|
#
|
||||||
|
|
||||||
|
import drs_base
|
||||||
|
import samba.tests
|
||||||
|
import ldb
|
||||||
|
from ldb import SCOPE_BASE
|
||||||
|
|
||||||
|
from samba import WERRORError
|
||||||
|
from samba.join import dc_join
|
||||||
|
from samba.dcerpc import drsuapi, misc, drsblobs, security
|
||||||
|
from samba.drs_utils import drs_DsBind, drs_Replicate
|
||||||
|
from samba.ndr import ndr_unpack, ndr_pack
|
||||||
|
from samba.common import dsdb_Dn
|
||||||
|
from samba.credentials import Credentials
|
||||||
|
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
|
||||||
|
def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
|
||||||
|
'''get a list of attributes for RODC replication'''
|
||||||
|
partial_attribute_set = drsuapi.DsPartialAttributeSet()
|
||||||
|
partial_attribute_set.version = 1
|
||||||
|
|
||||||
|
attids = []
|
||||||
|
|
||||||
|
# the exact list of attids we send is quite critical. Note that
|
||||||
|
# we do ask for the secret attributes, but set SPECIAL_SECRET_PROCESSING
|
||||||
|
# to zero them out
|
||||||
|
schema_dn = samdb.get_schema_basedn()
|
||||||
|
res = samdb.search(base=schema_dn, scope=ldb.SCOPE_SUBTREE,
|
||||||
|
expression="objectClass=attributeSchema",
|
||||||
|
attrs=["lDAPDisplayName", "systemFlags",
|
||||||
|
"searchFlags"])
|
||||||
|
|
||||||
|
for r in res:
|
||||||
|
ldap_display_name = r["lDAPDisplayName"][0]
|
||||||
|
if "systemFlags" in r:
|
||||||
|
system_flags = r["systemFlags"][0]
|
||||||
|
if (int(system_flags) & (samba.dsdb.DS_FLAG_ATTR_NOT_REPLICATED |
|
||||||
|
samba.dsdb.DS_FLAG_ATTR_IS_CONSTRUCTED)):
|
||||||
|
continue
|
||||||
|
if "searchFlags" in r:
|
||||||
|
search_flags = r["searchFlags"][0]
|
||||||
|
if (int(search_flags) & samba.dsdb.SEARCH_FLAG_RODC_ATTRIBUTE):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
attid = samdb1.get_attid_from_lDAPDisplayName(ldap_display_name)
|
||||||
|
if not attid in exceptions:
|
||||||
|
attids.append(int(attid))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# the attids do need to be sorted, or windows doesn't return
|
||||||
|
# all the attributes we need
|
||||||
|
attids.sort()
|
||||||
|
partial_attribute_set.attids = attids
|
||||||
|
partial_attribute_set.num_attids = len(attids)
|
||||||
|
return partial_attribute_set
|
||||||
|
|
||||||
|
class DrsRodcTestCase(drs_base.DrsBaseTestCase):
|
||||||
|
"""Intended as a semi-black box test case for replication involving
|
||||||
|
an RODC."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(DrsRodcTestCase, self).setUp()
|
||||||
|
self.base_dn = self.ldb_dc1.get_default_basedn()
|
||||||
|
|
||||||
|
rand = random.randint(1, 10000000)
|
||||||
|
|
||||||
|
self.ou = "OU=test_drs_rodc%s,%s" % (rand, self.base_dn)
|
||||||
|
self.ldb_dc1.add({
|
||||||
|
"dn": self.ou,
|
||||||
|
"objectclass": "organizationalUnit"
|
||||||
|
})
|
||||||
|
self.allowed_group = "CN=Allowed RODC Password Replication Group,CN=Users,%s" % self.base_dn
|
||||||
|
|
||||||
|
self.site = self.ldb_dc1.server_site_name()
|
||||||
|
self.rodc_name = "TESTRODCDRS%s" % rand
|
||||||
|
self.rodc_pass = "password12#"
|
||||||
|
self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
|
||||||
|
|
||||||
|
|
||||||
|
self.rodc_ctx = dc_join(server=self.ldb_dc1.host_dns_name(), creds=self.get_credentials(), lp=self.get_loadparm(),
|
||||||
|
site=self.site, netbios_name=self.rodc_name,
|
||||||
|
targetdir=None, domain=None, machinepass=self.rodc_pass)
|
||||||
|
self._create_rodc(self.rodc_ctx)
|
||||||
|
self.rodc_ctx.create_tmp_samdb()
|
||||||
|
self.tmp_samdb = self.rodc_ctx.tmp_samdb
|
||||||
|
|
||||||
|
rodc_creds = Credentials()
|
||||||
|
rodc_creds.guess(self.rodc_ctx.lp)
|
||||||
|
rodc_creds.set_username(self.rodc_name+'$')
|
||||||
|
rodc_creds.set_password(self.rodc_pass)
|
||||||
|
|
||||||
|
(self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
|
||||||
|
(self.rodc_drs, self.rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, rodc_creds)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.rodc_ctx.cleanup_old_join()
|
||||||
|
super(DrsRodcTestCase, self).tearDown()
|
||||||
|
|
||||||
|
def test_admin_repl_secrets(self):
|
||||||
|
"""
|
||||||
|
When a secret attribute is set to be replicated to an RODC with the
|
||||||
|
admin credentials, it should always replicate regardless of whether
|
||||||
|
or not it's in the Allowed RODC Password Replication Group.
|
||||||
|
"""
|
||||||
|
rand = random.randint(1, 10000000)
|
||||||
|
expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_supplementalCredentials,
|
||||||
|
drsuapi.DRSUAPI_ATTID_ntPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_unicodePwd,
|
||||||
|
drsuapi.DRSUAPI_ATTID_dBCSPwd]
|
||||||
|
|
||||||
|
user_name = "test_rodcA_%s" % rand
|
||||||
|
user_dn = "CN=%s,%s" % (user_name, self.ou)
|
||||||
|
self.ldb_dc1.add({
|
||||||
|
"dn": user_dn,
|
||||||
|
"objectclass": "user",
|
||||||
|
"sAMAccountName": user_name
|
||||||
|
})
|
||||||
|
|
||||||
|
# Store some secret on this user
|
||||||
|
self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
|
||||||
|
|
||||||
|
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
|
||||||
|
invocation_id=self.ldb_dc1.get_invocation_id(),
|
||||||
|
nc_dn_str=user_dn,
|
||||||
|
exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
|
||||||
|
partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
|
||||||
|
max_objects=133,
|
||||||
|
replica_flags=0)
|
||||||
|
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
|
||||||
|
|
||||||
|
# Check that the user has been added to msDSRevealedUsers
|
||||||
|
self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
|
||||||
|
def test_rodc_repl_secrets(self):
|
||||||
|
"""
|
||||||
|
When a secret attribute is set to be replicated to an RODC with
|
||||||
|
the RODC account credentials, it should not replicate if it's in
|
||||||
|
the Allowed RODC Password Replication Group. Once it is added to
|
||||||
|
the group, it should replicate.
|
||||||
|
"""
|
||||||
|
rand = random.randint(1, 10000000)
|
||||||
|
expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_supplementalCredentials,
|
||||||
|
drsuapi.DRSUAPI_ATTID_ntPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_unicodePwd,
|
||||||
|
drsuapi.DRSUAPI_ATTID_dBCSPwd]
|
||||||
|
|
||||||
|
user_name = "test_rodcB_%s" % rand
|
||||||
|
user_dn = "CN=%s,%s" % (user_name, self.ou)
|
||||||
|
self.ldb_dc1.add({
|
||||||
|
"dn": user_dn,
|
||||||
|
"objectclass": "user",
|
||||||
|
"sAMAccountName": user_name
|
||||||
|
})
|
||||||
|
|
||||||
|
# Store some secret on this user
|
||||||
|
self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, 'penguin12#', False, user_name)
|
||||||
|
|
||||||
|
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
|
||||||
|
invocation_id=self.ldb_dc1.get_invocation_id(),
|
||||||
|
nc_dn_str=user_dn,
|
||||||
|
exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
|
||||||
|
partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
|
||||||
|
max_objects=133,
|
||||||
|
replica_flags=0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
(level, ctr) = self.rodc_drs.DsGetNCChanges(self.rodc_drs_handle, 10, req10)
|
||||||
|
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
|
||||||
|
except WERRORError as (enum, estr):
|
||||||
|
self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
|
||||||
|
|
||||||
|
# Retry with Administrator credentials, ignores password replication groups
|
||||||
|
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
|
||||||
|
|
||||||
|
# Check that the user has been added to msDSRevealedUsers
|
||||||
|
self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
|
||||||
|
def test_msDSRevealedUsers(self):
|
||||||
|
"""
|
||||||
|
When a secret attribute is to be replicated to an RODC, the contents
|
||||||
|
of the attribute should be added to the msDSRevealedUsers attribute
|
||||||
|
of the computer object corresponding to the RODC.
|
||||||
|
"""
|
||||||
|
|
||||||
|
rand = random.randint(1, 10000000)
|
||||||
|
expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_supplementalCredentials,
|
||||||
|
drsuapi.DRSUAPI_ATTID_ntPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_unicodePwd,
|
||||||
|
drsuapi.DRSUAPI_ATTID_dBCSPwd]
|
||||||
|
|
||||||
|
# Add a user on DC1, add it to allowed password replication
|
||||||
|
# group, and replicate to RODC with EXOP_REPL_SECRETS
|
||||||
|
user_name = "test_rodcC_%s" % rand
|
||||||
|
password = "password12#"
|
||||||
|
user_dn = "CN=%s,%s" % (user_name, self.ou)
|
||||||
|
self.ldb_dc1.add({
|
||||||
|
"dn": user_dn,
|
||||||
|
"objectclass": "user",
|
||||||
|
"sAMAccountName": user_name
|
||||||
|
})
|
||||||
|
|
||||||
|
# Store some secret on this user
|
||||||
|
self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
|
||||||
|
|
||||||
|
self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
|
||||||
|
[user_name],
|
||||||
|
add_members_operation=True)
|
||||||
|
|
||||||
|
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
|
||||||
|
invocation_id=self.ldb_dc1.get_invocation_id(),
|
||||||
|
nc_dn_str=user_dn,
|
||||||
|
exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
|
||||||
|
partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
|
||||||
|
max_objects=133,
|
||||||
|
replica_flags=0)
|
||||||
|
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
|
||||||
|
|
||||||
|
# Check that the user has been added to msDSRevealedUsers
|
||||||
|
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
|
||||||
|
# Change the user's password on DC1
|
||||||
|
self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
|
||||||
|
|
||||||
|
(packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
|
||||||
|
|
||||||
|
# Replicate to RODC again with EXOP_REPL_SECRETS
|
||||||
|
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
|
||||||
|
invocation_id=self.ldb_dc1.get_invocation_id(),
|
||||||
|
nc_dn_str=user_dn,
|
||||||
|
exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
|
||||||
|
partial_attribute_set=drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb),
|
||||||
|
max_objects=133,
|
||||||
|
replica_flags=0)
|
||||||
|
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
|
||||||
|
|
||||||
|
# This is important for Windows, because the entry won't have been
|
||||||
|
# updated in time if we don't have it. Even with this sleep, it only
|
||||||
|
# passes some of the time...
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Check that the entry in msDSRevealedUsers has been updated
|
||||||
|
(packed_attrs_3, unpacked_attrs_3) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
self._assert_attrlist_changed(unpacked_attrs_2, unpacked_attrs_3, expected_user_attributes)
|
||||||
|
|
||||||
|
# We should be able to delete the user
|
||||||
|
self.ldb_dc1.deleteuser(user_name)
|
||||||
|
|
||||||
|
res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
|
||||||
|
attrs=["msDS-RevealedUsers"])
|
||||||
|
self.assertFalse("msDS-RevealedUsers" in res[0])
|
||||||
|
|
||||||
|
def test_msDSRevealedUsers_pas(self):
|
||||||
|
"""
|
||||||
|
If we provide a Partial Attribute Set when replicating to an RODC,
|
||||||
|
we should ignore it and replicate all of the secret attributes anyway
|
||||||
|
msDSRevealedUsers attribute.
|
||||||
|
"""
|
||||||
|
rand = random.randint(1, 10000000)
|
||||||
|
expected_user_attributes = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_supplementalCredentials,
|
||||||
|
drsuapi.DRSUAPI_ATTID_ntPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_unicodePwd,
|
||||||
|
drsuapi.DRSUAPI_ATTID_dBCSPwd]
|
||||||
|
pas_exceptions = [drsuapi.DRSUAPI_ATTID_lmPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_supplementalCredentials,
|
||||||
|
drsuapi.DRSUAPI_ATTID_ntPwdHistory,
|
||||||
|
drsuapi.DRSUAPI_ATTID_dBCSPwd]
|
||||||
|
|
||||||
|
# Add a user on DC1, add it to allowed password replication
|
||||||
|
# group, and replicate to RODC with EXOP_REPL_SECRETS
|
||||||
|
user_name = "test_rodcD_%s" % rand
|
||||||
|
password = "password12#"
|
||||||
|
user_dn = "CN=%s,%s" % (user_name, self.ou)
|
||||||
|
self.ldb_dc1.add({
|
||||||
|
"dn": user_dn,
|
||||||
|
"objectclass": "user",
|
||||||
|
"sAMAccountName": user_name
|
||||||
|
})
|
||||||
|
|
||||||
|
# Store some secret on this user
|
||||||
|
self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password, False, user_name)
|
||||||
|
|
||||||
|
self.ldb_dc1.add_remove_group_members("Allowed RODC Password Replication Group",
|
||||||
|
[user_name],
|
||||||
|
add_members_operation=True)
|
||||||
|
|
||||||
|
pas = drs_get_rodc_partial_attribute_set(self.ldb_dc1, self.tmp_samdb, exceptions=pas_exceptions)
|
||||||
|
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
|
||||||
|
invocation_id=self.ldb_dc1.get_invocation_id(),
|
||||||
|
nc_dn_str=user_dn,
|
||||||
|
exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
|
||||||
|
partial_attribute_set=pas,
|
||||||
|
max_objects=133,
|
||||||
|
replica_flags=0)
|
||||||
|
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
|
||||||
|
|
||||||
|
# Make sure that we still replicate the secrets
|
||||||
|
for attribute in ctr.first_object.object.attribute_ctr.attributes:
|
||||||
|
if attribute.attid in pas_exceptions:
|
||||||
|
pas_exceptions.remove(attribute.attid)
|
||||||
|
for attribute in pas_exceptions:
|
||||||
|
self.fail("%d was not replicated even though the partial attribute set should be ignored."
|
||||||
|
% attribute)
|
||||||
|
|
||||||
|
# Check that the user has been added to msDSRevealedUsers
|
||||||
|
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
|
||||||
|
|
||||||
|
def _assert_in_revealed_users(self, user_dn, attrlist):
|
||||||
|
res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
|
||||||
|
attrs=["msDS-RevealedUsers"])
|
||||||
|
revealed_users = res[0]["msDS-RevealedUsers"]
|
||||||
|
actual_attrids = []
|
||||||
|
packed_attrs = []
|
||||||
|
unpacked_attrs = []
|
||||||
|
for attribute in revealed_users:
|
||||||
|
dsdb_dn = dsdb_Dn(self.ldb_dc1, attribute)
|
||||||
|
metadata = ndr_unpack(drsblobs.replPropertyMetaData1, dsdb_dn.get_bytes())
|
||||||
|
if user_dn in attribute:
|
||||||
|
unpacked_attrs.append(metadata)
|
||||||
|
packed_attrs.append(dsdb_dn.get_bytes())
|
||||||
|
actual_attrids.append(metadata.attid)
|
||||||
|
|
||||||
|
self.assertEquals(sorted(actual_attrids), sorted(attrlist))
|
||||||
|
|
||||||
|
return (packed_attrs, unpacked_attrs)
|
||||||
|
|
||||||
|
def _assert_attrlist_equals(self, list_1, list_2):
|
||||||
|
return self._assert_attrlist_changed(list_1, list_2, [], num_changes=0, expected_new_usn=False)
|
||||||
|
|
||||||
|
def _assert_attrlist_changed(self, list_1, list_2, changed_attributes, num_changes=1, expected_new_usn=True):
|
||||||
|
for i in range(len(list_2)):
|
||||||
|
self.assertEquals(list_1[i].attid, list_2[i].attid)
|
||||||
|
self.assertEquals(list_1[i].originating_invocation_id, list_2[i].originating_invocation_id)
|
||||||
|
self.assertEquals(list_1[i].version + num_changes, list_2[i].version)
|
||||||
|
|
||||||
|
if expected_new_usn:
|
||||||
|
self.assertTrue(list_1[i].originating_usn < list_2[i].originating_usn)
|
||||||
|
self.assertTrue(list_1[i].local_usn < list_2[i].local_usn)
|
||||||
|
else:
|
||||||
|
self.assertEquals(list_1[i].originating_usn, list_2[i].originating_usn)
|
||||||
|
self.assertEquals(list_1[i].local_usn, list_2[i].local_usn)
|
||||||
|
|
||||||
|
if list_1[i].attid in changed_attributes:
|
||||||
|
# We do the changes too quickly, so unless we put sleeps
|
||||||
|
# inbetween calls, these remain the same. Checking the USNs
|
||||||
|
# is enough.
|
||||||
|
pass
|
||||||
|
#self.assertTrue(list_1[i].originating_change_time < list_2[i].originating_change_time)
|
||||||
|
else:
|
||||||
|
self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_rodc(self, ctx):
|
||||||
|
ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
|
||||||
|
ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
|
||||||
|
ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
|
||||||
|
|
||||||
|
ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
|
||||||
|
"<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
|
||||||
|
"<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
|
||||||
|
"<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
|
||||||
|
"<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
|
||||||
|
ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
|
||||||
|
|
||||||
|
mysid = ctx.get_mysid()
|
||||||
|
admin_dn = "<SID=%s>" % mysid
|
||||||
|
ctx.managedby = admin_dn
|
||||||
|
|
||||||
|
ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
|
||||||
|
samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
|
||||||
|
samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
|
||||||
|
|
||||||
|
ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
|
||||||
|
ctx.secure_channel_type = misc.SEC_CHAN_RODC
|
||||||
|
ctx.RODC = True
|
||||||
|
ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
|
||||||
|
drsuapi.DRSUAPI_DRS_PER_SYNC |
|
||||||
|
drsuapi.DRSUAPI_DRS_GET_ANC |
|
||||||
|
drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
|
||||||
|
drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
|
||||||
|
|
||||||
|
ctx.join_add_objects()
|
Loading…
x
Reference in New Issue
Block a user