mirror of
https://github.com/samba-team/samba.git
synced 2025-01-22 22:04:08 +03:00
s4-dsdb: add unit tests for dirsync control
Signed-off-by: Andrew Tridgell <tridge@samba.org>
This commit is contained in:
parent
fa400af18b
commit
c2fa348831
713
source4/dsdb/tests/python/dirsync.py
Executable file
713
source4/dsdb/tests/python/dirsync.py
Executable file
@ -0,0 +1,713 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Unit tests for dirsync control
|
||||
# Copyright (C) Matthieu Patou <mat@matws.net> 2011
|
||||
#
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import optparse
|
||||
import sys
|
||||
sys.path.insert(0, "bin/python")
|
||||
import samba
|
||||
samba.ensure_external_module("testtools", "testtools")
|
||||
samba.ensure_external_module("subunit", "subunit/python")
|
||||
|
||||
import samba.getopt as options
|
||||
import base64
|
||||
|
||||
from ldb import LdbError, SCOPE_BASE
|
||||
from ldb import Message, MessageElement, Dn
|
||||
from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
|
||||
from samba.dcerpc import security, misc, drsblobs
|
||||
from samba.ndr import ndr_unpack, ndr_pack
|
||||
|
||||
from samba.auth import system_session
|
||||
from samba import gensec, sd_utils
|
||||
from samba.samdb import SamDB
|
||||
from samba.credentials import Credentials
|
||||
import samba.tests
|
||||
from samba.tests import delete_force
|
||||
from subunit.run import SubunitTestRunner
|
||||
import unittest
|
||||
|
||||
parser = optparse.OptionParser("dirsync.py [options] <host>")
|
||||
sambaopts = options.SambaOptions(parser)
|
||||
parser.add_option_group(sambaopts)
|
||||
parser.add_option_group(options.VersionOptions(parser))
|
||||
|
||||
# use command line creds if available
|
||||
credopts = options.CredentialsOptions(parser)
|
||||
parser.add_option_group(credopts)
|
||||
opts, args = parser.parse_args()
|
||||
|
||||
if len(args) < 1:
|
||||
parser.print_usage()
|
||||
sys.exit(1)
|
||||
|
||||
host = args[0]
|
||||
if not "://" in host:
|
||||
ldaphost = "ldap://%s" % host
|
||||
ldapshost = "ldaps://%s" % host
|
||||
else:
|
||||
ldaphost = host
|
||||
start = host.rindex("://")
|
||||
host = host.lstrip(start+3)
|
||||
|
||||
lp = sambaopts.get_loadparm()
|
||||
creds = credopts.get_credentials(lp)
|
||||
|
||||
#
|
||||
# Tests start here
|
||||
#
|
||||
|
||||
class DirsyncBaseTests(samba.tests.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DirsyncBaseTests, self).setUp()
|
||||
self.ldb_admin = ldb
|
||||
self.base_dn = ldb.domain_dn()
|
||||
self.domain_sid = security.dom_sid(ldb.get_domain_sid())
|
||||
self.user_pass = "samba123@AAA"
|
||||
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
|
||||
self.sd_utils = sd_utils.SDUtils(ldb)
|
||||
#used for anonymous login
|
||||
print "baseDN: %s" % self.base_dn
|
||||
|
||||
def get_user_dn(self, name):
|
||||
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
|
||||
|
||||
def get_ldb_connection(self, target_username, target_password):
|
||||
creds_tmp = Credentials()
|
||||
creds_tmp.set_username(target_username)
|
||||
creds_tmp.set_password(target_password)
|
||||
creds_tmp.set_domain(creds.get_domain())
|
||||
creds_tmp.set_realm(creds.get_realm())
|
||||
creds_tmp.set_workstation(creds.get_workstation())
|
||||
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
|
||||
| gensec.FEATURE_SEAL)
|
||||
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
|
||||
return ldb_target
|
||||
|
||||
|
||||
#tests on ldap add operations
|
||||
class SimpleDirsyncTests(DirsyncBaseTests):
|
||||
|
||||
def setUp(self):
|
||||
super(SimpleDirsyncTests, self).setUp()
|
||||
# Regular user
|
||||
self.dirsync_user = "test_dirsync_user"
|
||||
self.simple_user = "test_simple_user"
|
||||
self.admin_user = "test_admin_user"
|
||||
self.ouname = None
|
||||
|
||||
self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
|
||||
self.ldb_admin.newuser(self.simple_user, self.user_pass)
|
||||
self.ldb_admin.newuser(self.admin_user, self.user_pass)
|
||||
self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
|
||||
|
||||
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
|
||||
mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
|
||||
self.sd_utils.dacl_add_ace(self.base_dn, mod)
|
||||
|
||||
# add admins to the Domain Admins group
|
||||
self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
|
||||
add_members_operation=True)
|
||||
|
||||
def tearDown(self):
|
||||
super(SimpleDirsyncTests, self).tearDown()
|
||||
delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
|
||||
delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
|
||||
delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
|
||||
if self.ouname:
|
||||
delete_force(self.ldb_admin, self.ouname)
|
||||
self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
|
||||
try:
|
||||
self.ldb_admin.deletegroup("testgroup")
|
||||
except:
|
||||
pass
|
||||
|
||||
#def test_dirsync_errors(self):
|
||||
|
||||
|
||||
def test_dirsync_supported(self):
|
||||
"""Test the basic of the dirsync is supported"""
|
||||
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
|
||||
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
|
||||
res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
|
||||
res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
|
||||
try:
|
||||
self.ldb_simple.search(self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except LdbError,l:
|
||||
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
|
||||
|
||||
def test_parentGUID_referrals(self):
|
||||
res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="name=Configuration",
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
|
||||
|
||||
def test_ok_not_rootdc(self):
|
||||
"""Test if it's ok to do dirsync on another NC that is not the root DC"""
|
||||
try:
|
||||
res = self.ldb_admin.search("CN=Configuration, %s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except:
|
||||
self.assertTrue(False)
|
||||
|
||||
def test_dirsync_errors(self):
|
||||
"""Test if dirsync returns the correct LDAP errors in case of pb"""
|
||||
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
|
||||
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
|
||||
try:
|
||||
self.ldb_simple.search(self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
|
||||
|
||||
try:
|
||||
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
|
||||
|
||||
try:
|
||||
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:1:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
|
||||
|
||||
try:
|
||||
self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
|
||||
|
||||
try:
|
||||
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
|
||||
|
||||
try:
|
||||
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:1:1"])
|
||||
except LdbError,l:
|
||||
print l
|
||||
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
|
||||
|
||||
|
||||
|
||||
|
||||
def test_dirsync_attributes(self):
|
||||
"""Check behavior with some attributes """
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=*",
|
||||
controls=["dirsync:1:0:1"])
|
||||
# Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
|
||||
self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None)
|
||||
# Check that non replicated attributes are not returned
|
||||
self.assertTrue(res.msgs[0].get("badPwdCount") == None)
|
||||
# Check that non forward link are not returned
|
||||
self.assertTrue(res.msgs[0].get("memberof") == None)
|
||||
|
||||
# Asking for instanceType will return also objectGUID
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
attrs=["instanceType"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertTrue(res.msgs[0].get("objectGUID") != None)
|
||||
self.assertTrue(res.msgs[0].get("instanceType") != None)
|
||||
|
||||
# We don't return an entry if asked for objectGUID
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="dn=%s" % self.base_dn,
|
||||
attrs=["objectGUID"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertEquals(len(res.msgs), 0)
|
||||
|
||||
# a request on the root of a NC didn't return parentGUID
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="dn=%s" % self.base_dn,
|
||||
attrs=["name"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertTrue(res.msgs[0].get("objectGUID") != None)
|
||||
self.assertTrue(res.msgs[0].get("name") != None)
|
||||
self.assertTrue(res.msgs[0].get("parentGUID") == None)
|
||||
self.assertTrue(res.msgs[0].get("instanceType") != None)
|
||||
|
||||
# Asking for name will return also objectGUID and parentGUID
|
||||
# and instanceType and of course name
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
attrs=["name"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertTrue(res.msgs[0].get("objectGUID") != None)
|
||||
self.assertTrue(res.msgs[0].get("name") != None)
|
||||
self.assertTrue(res.msgs[0].get("parentGUID") != None)
|
||||
self.assertTrue(res.msgs[0].get("instanceType") != None)
|
||||
|
||||
# Asking for dn will not return not only DN but more like if attrs=*
|
||||
# parentGUID should be returned
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
attrs=["dn"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
count = len(res.msgs[0])
|
||||
res2 = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
controls=["dirsync:1:0:1"])
|
||||
count2 = len(res2.msgs[0])
|
||||
self.assertEqual(count, count2)
|
||||
|
||||
# Asking for cn will return nothing on objects that have CN as RDN
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
attrs=["cn"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertEqual(len(res.msgs), 0)
|
||||
# Asking for parentGUID will return nothing too
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="samaccountname=Administrator",
|
||||
attrs=["parentGUID"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
self.assertEqual(len(res.msgs), 0)
|
||||
ouname="OU=testou,%s" % self.base_dn
|
||||
self.ouname = ouname
|
||||
self.ldb_admin.create_ou(ouname)
|
||||
delta = Message()
|
||||
delta.dn = Dn(self.ldb_admin, str(ouname))
|
||||
delta["cn"] = MessageElement("test ou",
|
||||
FLAG_MOD_ADD,
|
||||
"cn" )
|
||||
self.ldb_admin.modify(delta)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="name=testou",
|
||||
attrs=["cn"],
|
||||
controls=["dirsync:1:0:1"])
|
||||
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
delete_force(self.ldb_admin, ouname)
|
||||
|
||||
def test_dirsync_with_controls(self):
|
||||
"""Check that dirsync return correct informations when dealing with the NC"""
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(dn=%s)" % str(self.base_dn),
|
||||
attrs=["name"],
|
||||
controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
|
||||
|
||||
def test_dirsync_basenc(self):
|
||||
"""Check that dirsync return correct informations when dealing with the NC"""
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(dn=%s)" % str(self.base_dn),
|
||||
attrs=["name"],
|
||||
controls=["dirsync:1:0:10000"])
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(dn=%s)" % str(self.base_dn),
|
||||
attrs=["ntSecurityDescriptor"],
|
||||
controls=["dirsync:1:0:10000"])
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
|
||||
def test_dirsync_othernc(self):
|
||||
"""Check that dirsync return information for entries that are normaly referrals (ie. other NCs)"""
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectclass=configuration)",
|
||||
attrs=["name"],
|
||||
controls=["dirsync:1:0:10000"])
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
self.assertEqual(len(res.msgs[0]), 4)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectclass=configuration)",
|
||||
attrs=["ntSecurityDescriptor"],
|
||||
controls=["dirsync:1:0:10000"])
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectclass=domaindns)",
|
||||
attrs=["ntSecurityDescriptor"],
|
||||
controls=["dirsync:1:0:10000"])
|
||||
nb = len(res.msgs)
|
||||
|
||||
# only sub nc returns a result when asked for objectGUID
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectclass=domaindns)",
|
||||
attrs=["objectGUID"],
|
||||
controls=["dirsync:1:0:0"])
|
||||
self.assertEqual(len(res.msgs), nb - 1)
|
||||
if nb > 1:
|
||||
self.assertTrue(res.msgs[0].get("objectGUID") != None)
|
||||
else:
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectclass=configuration)",
|
||||
attrs=["objectGUID"],
|
||||
controls=["dirsync:1:0:0"])
|
||||
|
||||
|
||||
def test_dirsync_send_delta(self):
|
||||
"""Check that dirsync return correct delta when sending the last cookie"""
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(samaccountname=test*)(!(isDeleted=*)))",
|
||||
controls=["dirsync:1:0:10000"])
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "10000"
|
||||
control = str(":".join(ctl))
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(samaccountname=test*)(!(isDeleted=*)))",
|
||||
controls=[control])
|
||||
self.assertEqual(len(res), 0)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=["dirsync:1:0:100000"])
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "10000"
|
||||
control2 = str(":".join(ctl))
|
||||
|
||||
# Let's create an OU
|
||||
ouname="OU=testou2,%s" % self.base_dn
|
||||
self.ouname = ouname
|
||||
self.ldb_admin.create_ou(ouname)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=[control2])
|
||||
self.assertEqual(len(res), 1)
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "10000"
|
||||
control3 = str(":".join(ctl))
|
||||
|
||||
delta = Message()
|
||||
delta.dn = Dn(self.ldb_admin, str(ouname))
|
||||
|
||||
delta["cn"] = MessageElement("test ou",
|
||||
FLAG_MOD_ADD,
|
||||
"cn" )
|
||||
self.ldb_admin.modify(delta)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=[control3])
|
||||
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
# 3 attributes: instanceType, cn and objectGUID
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
|
||||
delta = Message()
|
||||
delta.dn = Dn(self.ldb_admin, str(ouname))
|
||||
delta["cn"] = MessageElement([],
|
||||
FLAG_MOD_DELETE,
|
||||
"cn" )
|
||||
self.ldb_admin.modify(delta)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=[control3])
|
||||
|
||||
self.assertEqual(len(res.msgs), 1)
|
||||
# So we won't have much attribute returned but instanceType and GUID
|
||||
# are.
|
||||
# 3 attributes: instanceType and objectGUID and cn but empty
|
||||
self.assertEqual(len(res.msgs[0]), 3)
|
||||
ouname = "OU=newouname,%s" % self.base_dn
|
||||
self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
|
||||
self.ouname = ouname
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "10000"
|
||||
control4 = str(":".join(ctl))
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=[control3])
|
||||
|
||||
self.assertTrue(res[0].get("parentGUID") != None)
|
||||
self.assertTrue(res[0].get("name") != None)
|
||||
delete_force(self.ldb_admin, ouname)
|
||||
|
||||
def test_dirsync_linkedattributes(self):
|
||||
"""Check that dirsync returnd deleted objects too"""
|
||||
# Let's search for members
|
||||
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
|
||||
res = self.ldb_simple.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=["dirsync:1:1:1"])
|
||||
|
||||
self.assertTrue(len(res[0].get("member")) > 0)
|
||||
size = len(res[0].get("member"))
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "1"
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
|
||||
add_members_operation=True)
|
||||
|
||||
res = self.ldb_simple.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control1])
|
||||
|
||||
self.assertEqual(len(res[0].get("member")), size + 1)
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "1"
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
# remove the user from the group
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
|
||||
add_members_operation=False)
|
||||
|
||||
res = self.ldb_simple.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control1])
|
||||
|
||||
self.assertEqual(len(res[0].get("member")), size )
|
||||
|
||||
self.ldb_admin.newgroup("testgroup")
|
||||
self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
|
||||
add_members_operation=True)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=testgroup)",
|
||||
controls=["dirsync:1:0:1"])
|
||||
|
||||
self.assertEqual(len(res[0].get("member")), 1)
|
||||
self.assertTrue(res[0].get("member") != "" )
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "1"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
# Check that reasking the same question but with an updated cookie
|
||||
# didn't return any results.
|
||||
print control1
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=testgroup)",
|
||||
controls=[control1])
|
||||
self.assertEqual(len(res), 0)
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "1"
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
|
||||
add_members_operation=False)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=testgroup)",
|
||||
attrs=["member"],
|
||||
controls=[control1])
|
||||
|
||||
self.ldb_admin.deletegroup("testgroup")
|
||||
self.assertEqual(len(res[0].get("member")), 0)
|
||||
|
||||
|
||||
|
||||
def test_dirsync_deleted_items(self):
|
||||
"""Check that dirsync returnd deleted objects too"""
|
||||
# Let's create an OU
|
||||
ouname="OU=testou3,%s" % self.base_dn
|
||||
self.ouname = ouname
|
||||
self.ldb_admin.create_ou(ouname)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=["dirsync:1:0:1"])
|
||||
guid = None
|
||||
for e in res:
|
||||
if str(e["name"]) == "testou3":
|
||||
guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "0"
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
# So now delete the object and check that
|
||||
# we can see the object but deleted when admin
|
||||
delete_force(self.ldb_admin, ouname)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(objectClass=organizationalUnit)",
|
||||
controls=[control1])
|
||||
self.assertEqual(len(res), 1)
|
||||
guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
|
||||
self.assertEqual(guid2, guid)
|
||||
self.assertTrue(res[0].get("isDeleted"))
|
||||
self.assertTrue(res[0].get("name") != None)
|
||||
|
||||
def test_cookie_from_others(self):
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=["dirsync:1:0:1"])
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
|
||||
cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
|
||||
controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))]
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=controls)
|
||||
|
||||
class ExtendedDirsyncTests(SimpleDirsyncTests):
|
||||
def test_dirsync_linkedattributes(self):
|
||||
flag_incr_linked = 2147483648
|
||||
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
attrs=["member"],
|
||||
expression="(name=Administrators)",
|
||||
controls=["dirsync:1:%d:1" % flag_incr_linked])
|
||||
|
||||
self.assertTrue(res[0].get("member;range=1-1") != None )
|
||||
self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
|
||||
size = len(res[0].get("member;range=1-1"))
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "%d" % flag_incr_linked
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
|
||||
add_members_operation=True)
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
|
||||
add_members_operation=True)
|
||||
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control1])
|
||||
|
||||
self.assertEqual(len(res[0].get("member;range=1-1")), 2)
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "%d" % flag_incr_linked
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
# remove the user from the group
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
|
||||
add_members_operation=False)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control1])
|
||||
|
||||
self.assertEqual(res[0].get("member;range=1-1"), None )
|
||||
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
|
||||
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "%d" % flag_incr_linked
|
||||
ctl[3] = "10000"
|
||||
control2 = str(":".join(ctl))
|
||||
|
||||
self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
|
||||
add_members_operation=False)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control2])
|
||||
|
||||
self.assertEqual(res[0].get("member;range=1-1"), None )
|
||||
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
|
||||
|
||||
res = self.ldb_admin.search(self.base_dn,
|
||||
expression="(name=Administrators)",
|
||||
controls=[control1])
|
||||
|
||||
self.assertEqual(res[0].get("member;range=1-1"), None )
|
||||
self.assertEqual(len(res[0].get("member;range=0-0")), 2)
|
||||
|
||||
def test_dirsync_deleted_items(self):
|
||||
"""Check that dirsync returnd deleted objects too"""
|
||||
# Let's create an OU
|
||||
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
|
||||
ouname="OU=testou3,%s" % self.base_dn
|
||||
self.ouname = ouname
|
||||
self.ldb_admin.create_ou(ouname)
|
||||
|
||||
# Specify LDAP_DIRSYNC_OBJECT_SECURITY
|
||||
res = self.ldb_simple.search(self.base_dn,
|
||||
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
|
||||
controls=["dirsync:1:1:1"])
|
||||
|
||||
guid = None
|
||||
for e in res:
|
||||
if str(e["name"]) == "testou3":
|
||||
guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
|
||||
|
||||
self.assertTrue(guid != None)
|
||||
ctl = str(res.controls[0]).split(":")
|
||||
ctl[1] = "1"
|
||||
ctl[2] = "1"
|
||||
ctl[3] = "10000"
|
||||
control1 = str(":".join(ctl))
|
||||
|
||||
# So now delete the object and check that
|
||||
# we can see the object but deleted when admin
|
||||
# we just see the objectGUID when simple user
|
||||
delete_force(self.ldb_admin, ouname)
|
||||
|
||||
res = self.ldb_simple.search(self.base_dn,
|
||||
expression="(objectClass=organizationalUnit)",
|
||||
controls=[control1])
|
||||
self.assertEqual(len(res), 1)
|
||||
guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
|
||||
self.assertEqual(guid2, guid)
|
||||
self.assertEqual(str(res[0].dn), "")
|
||||
|
||||
|
||||
ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
|
||||
|
||||
runner = SubunitTestRunner()
|
||||
rc = 0
|
||||
#
|
||||
if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
|
||||
rc = 1
|
||||
if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
|
||||
rc = 1
|
||||
|
||||
sys.exit(rc)
|
@ -83,3 +83,5 @@ samba4.smb2.compound.*.invalid2
|
||||
samba4.ldap.acl.*.search.* # ACL search behaviour not enabled by default
|
||||
samba4.ldap.acl.*.ntSecurityDescriptor.* # ACL extended checks on search not enabled by default
|
||||
samba4.nbt.winsreplication.owned # fails sometimes, timing related
|
||||
samba4.ldap.dirsync.python.dc..__main__.ExtendedDirsyncTests.test_dirsync_deleted_items
|
||||
#samba4.ldap.dirsync.python.dc..__main__.ExtendedDirsyncTests.*
|
||||
|
@ -388,6 +388,7 @@ plantestsuite("samba4.tokengroups.python(dc)", "dc:local", [python, os.path.join
|
||||
plantestsuite("samba4.sam.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sam.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN'])
|
||||
plansambapythontestsuite("samba4.schemaInfo.python(dc)", "dc", os.path.join(samba4srcdir, 'dsdb/tests/python'), 'dsdb_schema_info', extra_args=['-U"$DOMAIN/$DC_USERNAME%$DC_PASSWORD"'])
|
||||
plantestsuite("samba4.urgent_replication.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/urgent_replication.py"), '$PREFIX_ABS/dc/private/sam.ldb'], allow_empty_output=True)
|
||||
plantestsuite("samba4.ldap.dirsync.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/dirsync.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN'])
|
||||
for env in ["dc", "fl2000dc", "fl2003dc", "fl2008r2dc"]:
|
||||
plantestsuite("samba4.ldap_schema.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap_schema.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN'])
|
||||
plantestsuite("samba4.ldap.possibleInferiors.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/samdb/ldb_modules/tests/possibleinferiors.py"), "ldap://$SERVER", '-U"$USERNAME%$PASSWORD"', "-W", "$DOMAIN"])
|
||||
|
Loading…
x
Reference in New Issue
Block a user