1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-07 17:18:11 +03:00
samba-mirror/source4/dsdb/tests/python/dirsync.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

764 lines
34 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
#
# Unit tests for dirsync control
# Copyright (C) Matthieu Patou <mat@matws.net> 2011
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import optparse
import sys
sys.path.insert(0, "bin/python")
import samba
from samba.tests.subunitrun import TestProgram, SubunitOptions
import samba.getopt as options
import base64
import ldb
from ldb import LdbError, SCOPE_BASE
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
from samba.dcerpc import security, misc, drsblobs
from samba.ndr import ndr_unpack, ndr_pack
from samba.auth import system_session
from samba import gensec, sd_utils
from samba.samdb import SamDB
from samba.credentials import Credentials, DONT_USE_KERBEROS
import samba.tests
from samba.tests import delete_force
parser = optparse.OptionParser("dirsync.py [options] <host>")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
parser.add_option_group(options.VersionOptions(parser))
# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
subunitopts = SubunitOptions(parser)
parser.add_option_group(subunitopts)
opts, args = parser.parse_args()
if len(args) < 1:
parser.print_usage()
sys.exit(1)
host = args.pop()
if "://" not in host:
ldaphost = "ldap://%s" % host
ldapshost = "ldaps://%s" % host
else:
ldaphost = host
start = host.rindex("://")
host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
#
# Tests start here
#
class DirsyncBaseTests(samba.tests.TestCase):
def setUp(self):
super(DirsyncBaseTests, self).setUp()
self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
self.base_dn = self.ldb_admin.domain_dn()
self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
self.user_pass = samba.generate_random_password(12, 16)
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
# used for anonymous login
print("baseDN: %s" % self.base_dn)
def get_user_dn(self, name):
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
def get_ldb_connection(self, target_username, target_password):
creds_tmp = Credentials()
creds_tmp.set_username(target_username)
creds_tmp.set_password(target_password)
creds_tmp.set_domain(creds.get_domain())
creds_tmp.set_realm(creds.get_realm())
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
# tests on ldap add operations
class SimpleDirsyncTests(DirsyncBaseTests):
def setUp(self):
super(SimpleDirsyncTests, self).setUp()
# Regular user
self.dirsync_user = "test_dirsync_user"
self.simple_user = "test_simple_user"
self.admin_user = "test_admin_user"
self.ouname = None
self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
self.ldb_admin.newuser(self.simple_user, self.user_pass)
self.ldb_admin.newuser(self.admin_user, self.user_pass)
self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
str(user_sid))
self.sd_utils.dacl_add_ace(self.base_dn, mod)
# add admins to the Domain Admins group
self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
add_members_operation=True)
def tearDown(self):
super(SimpleDirsyncTests, self).tearDown()
delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
if self.ouname:
delete_force(self.ldb_admin, self.ouname)
self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
# def test_dirsync_errors(self):
def test_dirsync_supported(self):
"""Test the basic of the dirsync is supported"""
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
try:
self.ldb_simple.search(self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
def test_parentGUID_referrals(self):
res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
res = self.ldb_admin.search(self.base_dn,
expression="name=Configuration",
controls=["dirsync:1:0:1"])
self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
def test_ok_not_rootdc(self):
"""Test if it's ok to do dirsync on another NC that is not the root DC"""
self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
def test_dirsync_errors(self):
"""Test if dirsync returns the correct LDAP errors in case of pb"""
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
try:
self.ldb_simple.search(self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
try:
self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
except LdbError as l:
print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
def test_dirsync_attributes(self):
"""Check behavior with some attributes """
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
# Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None)
# Check that non replicated attributes are not returned
self.assertTrue(res.msgs[0].get("badPwdCount") is None)
# Check that non forward link are not returned
self.assertTrue(res.msgs[0].get("memberof") is None)
# Asking for instanceType will return also objectGUID
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
attrs=["instanceType"],
controls=["dirsync:1:0:1"])
self.assertTrue(res.msgs[0].get("objectGUID") is not None)
self.assertTrue(res.msgs[0].get("instanceType") is not None)
# We don't return an entry if asked for objectGUID
res = self.ldb_admin.search(self.base_dn,
expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["objectGUID"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 0)
# a request on the root of a NC didn't return parentGUID
res = self.ldb_admin.search(self.base_dn,
expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:1"])
self.assertTrue(res.msgs[0].get("objectGUID") is not None)
self.assertTrue(res.msgs[0].get("name") is not None)
self.assertTrue(res.msgs[0].get("parentGUID") is None)
self.assertTrue(res.msgs[0].get("instanceType") is not None)
# Asking for name will return also objectGUID and parentGUID
# and instanceType and of course name
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
attrs=["name"],
controls=["dirsync:1:0:1"])
self.assertTrue(res.msgs[0].get("objectGUID") is not None)
self.assertTrue(res.msgs[0].get("name") is not None)
self.assertTrue(res.msgs[0].get("parentGUID") is not None)
self.assertTrue(res.msgs[0].get("instanceType") is not None)
# Asking for dn will not return not only DN but more like if attrs=*
# parentGUID should be returned
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
attrs=["dn"],
controls=["dirsync:1:0:1"])
count = len(res.msgs[0])
res2 = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
controls=["dirsync:1:0:1"])
count2 = len(res2.msgs[0])
self.assertEqual(count, count2)
# Asking for cn will return nothing on objects that have CN as RDN
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
attrs=["cn"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 0)
# Asking for parentGUID will return nothing too
res = self.ldb_admin.search(self.base_dn,
expression="samaccountname=Administrator",
attrs=["parentGUID"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 0)
ouname = "OU=testou,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
delta = Message()
delta.dn = Dn(self.ldb_admin, str(ouname))
delta["cn"] = MessageElement("test ou",
FLAG_MOD_ADD,
"cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="name=testou",
attrs=["cn"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 3)
delete_force(self.ldb_admin, ouname)
def test_dirsync_with_controls(self):
"""Check that dirsync return correct information when dealing with the NC"""
res = self.ldb_admin.search(self.base_dn,
expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
def test_dirsync_basenc(self):
"""Check that dirsync return correct information when dealing with the NC"""
res = self.ldb_admin.search(self.base_dn,
expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 3)
res = self.ldb_admin.search(self.base_dn,
expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["ntSecurityDescriptor"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 3)
def test_dirsync_othernc(self):
"""Check that dirsync return information for entries that are normally referrals (ie. other NCs)"""
res = self.ldb_admin.search(self.base_dn,
expression="(objectclass=configuration)",
attrs=["name"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 4)
res = self.ldb_admin.search(self.base_dn,
expression="(objectclass=configuration)",
attrs=["ntSecurityDescriptor"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 3)
res = self.ldb_admin.search(self.base_dn,
expression="(objectclass=domaindns)",
attrs=["ntSecurityDescriptor"],
controls=["dirsync:1:0:10000"])
nb = len(res.msgs)
# only sub nc returns a result when asked for objectGUID
res = self.ldb_admin.search(self.base_dn,
expression="(objectclass=domaindns)",
attrs=["objectGUID"],
controls=["dirsync:1:0:0"])
self.assertEqual(len(res.msgs), nb - 1)
if nb > 1:
self.assertTrue(res.msgs[0].get("objectGUID") is not None)
else:
res = self.ldb_admin.search(self.base_dn,
expression="(objectclass=configuration)",
attrs=["objectGUID"],
controls=["dirsync:1:0:0"])
def test_dirsync_send_delta(self):
"""Check that dirsync return correct delta when sending the last cookie"""
res = self.ldb_admin.search(self.base_dn,
expression="(&(samaccountname=test*)(!(isDeleted=*)))",
controls=["dirsync:1:0:10000"])
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "10000"
control = str(":".join(ctl))
res = self.ldb_admin.search(self.base_dn,
expression="(&(samaccountname=test*)(!(isDeleted=*)))",
controls=[control])
self.assertEqual(len(res), 0)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=["dirsync:1:0:100000"])
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "10000"
control2 = str(":".join(ctl))
# Let's create an OU
ouname = "OU=testou2,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=[control2])
self.assertEqual(len(res), 1)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "10000"
control3 = str(":".join(ctl))
delta = Message()
delta.dn = Dn(self.ldb_admin, str(ouname))
delta["cn"] = MessageElement("test ou",
FLAG_MOD_ADD,
"cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=[control3])
self.assertEqual(len(res.msgs), 1)
# 3 attributes: instanceType, cn and objectGUID
self.assertEqual(len(res.msgs[0]), 3)
delta = Message()
delta.dn = Dn(self.ldb_admin, str(ouname))
delta["cn"] = MessageElement([],
FLAG_MOD_DELETE,
"cn")
self.ldb_admin.modify(delta)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=[control3])
self.assertEqual(len(res.msgs), 1)
# So we won't have much attribute returned but instanceType and GUID
# are.
# 3 attributes: instanceType and objectGUID and cn but empty
self.assertEqual(len(res.msgs[0]), 3)
ouname = "OU=newouname,%s" % self.base_dn
self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
self.ouname = ouname
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "10000"
control4 = str(":".join(ctl))
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=[control4])
self.assertTrue(res[0].get("parentGUID") is not None)
self.assertTrue(res[0].get("name") is not None)
delete_force(self.ldb_admin, ouname)
def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
"""Check that dirsync returned deleted objects too"""
# Let's search for members
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=["dirsync:1:1:1"])
self.assertTrue(len(res[0].get("member")) > 0)
size = len(res[0].get("member"))
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "1"
ctl[3] = "10000"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=True)
res = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self.assertEqual(len(res[0].get("member")), size + 1)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "1"
ctl[3] = "10000"
control1 = str(":".join(ctl))
# remove the user from the group
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=False)
res = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self.assertEqual(len(res[0].get("member")), size)
self.ldb_admin.newgroup("testgroup")
self.addCleanup(self.ldb_admin.deletegroup, "testgroup")
self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
add_members_operation=True)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
controls=["dirsync:1:0:1"])
self.assertEqual(len(res[0].get("member")), 1)
self.assertTrue(res[0].get("member") != "")
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "1"
control1 = str(":".join(ctl))
# Check that reasking the same question but with an updated cookie
# didn't return any results.
print(control1)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
controls=[control1])
self.assertEqual(len(res), 0)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "1"
ctl[3] = "10000"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
attrs=["member"],
controls=[control1])
self.assertEqual(len(res[0].get("member")), 0)
def test_dirsync_deleted_items(self):
"""Check that dirsync returned deleted objects too"""
# Let's create an OU
ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=["dirsync:1:0:1"])
guid = None
for e in res:
if str(e["name"]) == "testou3":
guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "0"
ctl[3] = "10000"
control1 = str(":".join(ctl))
# So now delete the object and check that
# we can see the object but deleted when admin
delete_force(self.ldb_admin, ouname)
res = self.ldb_admin.search(self.base_dn,
expression="(objectClass=organizationalUnit)",
controls=[control1])
self.assertEqual(len(res), 1)
guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertTrue(res[0].get("isDeleted"))
self.assertTrue(res[0].get("name") is not None)
def test_cookie_from_others(self):
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=["dirsync:1:0:1"])
ctl = str(res.controls[0]).split(":")
cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=controls)
def test_dirsync_linkedattributes_range(self):
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_admin.search(self.base_dn,
attrs=["member;range=1-1"],
expression="(name=Administrators)",
controls=["dirsync:1:0:0"])
self.assertTrue(len(res) > 0)
self.assertTrue(res[0].get("member;range=1-1") is None)
self.assertTrue(res[0].get("member") is not None)
self.assertTrue(len(res[0].get("member")) > 0)
def test_dirsync_linkedattributes_range_user(self):
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
try:
res = self.ldb_simple.search(self.base_dn,
attrs=["member;range=1-1"],
expression="(name=Administrators)",
controls=["dirsync:1:0:0"])
except LdbError as e:
(num, _) = e.args
self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
else:
self.fail()
def test_dirsync_linkedattributes(self):
flag_incr_linked = 2147483648
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_admin.search(self.base_dn,
attrs=["member"],
expression="(name=Administrators)",
controls=["dirsync:1:%d:1" % flag_incr_linked])
self.assertTrue(res[0].get("member;range=1-1") is not None)
self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
size = len(res[0].get("member;range=1-1"))
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "%d" % flag_incr_linked
ctl[3] = "10000"
control1 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=True)
self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
add_members_operation=True)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self.assertEqual(len(res[0].get("member;range=1-1")), 2)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "%d" % flag_incr_linked
ctl[3] = "10000"
control1 = str(":".join(ctl))
# remove the user from the group
self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "%d" % flag_incr_linked
ctl[3] = "10000"
control2 = str(":".join(ctl))
self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control2])
self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 1)
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self.assertEqual(res[0].get("member;range=1-1"), None)
self.assertEqual(len(res[0].get("member;range=0-0")), 2)
def test_dirsync_extended_dn(self):
"""Check that dirsync works together with the extended_dn control"""
# Let's search for members
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
res = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=["dirsync:1:1:1"])
self.assertTrue(len(res[0].get("member")) > 0)
size = len(res[0].get("member"))
resEX1 = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=["dirsync:1:1:1","extended_dn:1:1"])
self.assertTrue(len(resEX1[0].get("member")) > 0)
sizeEX1 = len(resEX1[0].get("member"))
self.assertEqual(sizeEX1, size)
self.assertIn(res[0]["member"][0], resEX1[0]["member"][0])
self.assertIn(b"<GUID=", resEX1[0]["member"][0])
self.assertIn(b">;<SID=S-1-5-21-", resEX1[0]["member"][0])
resEX0 = self.ldb_simple.search(self.base_dn,
expression="(name=Administrators)",
controls=["dirsync:1:1:1","extended_dn:1:0"])
self.assertTrue(len(resEX0[0].get("member")) > 0)
sizeEX0 = len(resEX0[0].get("member"))
self.assertEqual(sizeEX0, size)
self.assertIn(res[0]["member"][0], resEX0[0]["member"][0])
self.assertIn(b"<GUID=", resEX0[0]["member"][0])
self.assertIn(b">;<SID=010500000000000515", resEX0[0]["member"][0])
def test_dirsync_deleted_items_OBJECT_SECURITY(self):
"""Check that dirsync returned deleted objects too"""
# Let's create an OU
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
# Specify LDAP_DIRSYNC_OBJECT_SECURITY
res = self.ldb_simple.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=["dirsync:1:1:1"])
guid = None
for e in res:
if str(e["name"]) == "testou3":
guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
self.assertTrue(guid is not None)
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
ctl[2] = "1"
ctl[3] = "10000"
control1 = str(":".join(ctl))
# So now delete the object and check that
# we can see the object but deleted when admin
# we just see the objectGUID when simple user
delete_force(self.ldb_admin, ouname)
res = self.ldb_simple.search(self.base_dn,
expression="(objectClass=organizationalUnit)",
controls=[control1])
self.assertEqual(len(res), 1)
guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertEqual(str(res[0].dn), "")
def test_dirsync_unicodePwd(self):
res = self.ldb_admin.search(self.base_dn,
attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
expression="(samAccountName=krbtgt)",
controls=["dirsync:1:0:0"])
self.assertTrue(len(res) == 1)
# This form ensures this is a case insensitive comparison
self.assertTrue("samAccountName" in res[0])
self.assertTrue(res[0].get("samAccountName"))
self.assertTrue(res[0].get("unicodePwd") is None)
self.assertTrue(res[0].get("supplementalCredentials") is None)
if not getattr(opts, "listtests", False):
lp = sambaopts.get_loadparm()
samba.tests.cmdline_credentials = credopts.get_credentials(lp)
TestProgram(module=__name__, opts=subunitopts)