From c2fa3488317fa6b5eac0c7acc1fbed1e8cebcb7d Mon Sep 17 00:00:00 2001 From: Matthieu Patou Date: Sun, 27 Feb 2011 12:24:45 +0300 Subject: [PATCH] s4-dsdb: add unit tests for dirsync control Signed-off-by: Andrew Tridgell --- source4/dsdb/tests/python/dirsync.py | 713 +++++++++++++++++++++++++++ source4/selftest/knownfail | 2 + source4/selftest/tests.py | 1 + 3 files changed, 716 insertions(+) create mode 100755 source4/dsdb/tests/python/dirsync.py diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py new file mode 100755 index 00000000000..96c69508418 --- /dev/null +++ b/source4/dsdb/tests/python/dirsync.py @@ -0,0 +1,713 @@ +#!/usr/bin/env python +# +# Unit tests for dirsync control +# Copyright (C) Matthieu Patou 2011 +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import optparse +import sys +sys.path.insert(0, "bin/python") +import samba +samba.ensure_external_module("testtools", "testtools") +samba.ensure_external_module("subunit", "subunit/python") + +import samba.getopt as options +import base64 + +from ldb import LdbError, SCOPE_BASE +from ldb import Message, MessageElement, Dn +from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE +from samba.dcerpc import security, misc, drsblobs +from samba.ndr import ndr_unpack, ndr_pack + +from samba.auth import system_session +from samba import gensec, sd_utils +from samba.samdb import SamDB +from samba.credentials import Credentials +import samba.tests +from samba.tests import delete_force +from subunit.run import SubunitTestRunner +import unittest + +parser = optparse.OptionParser("dirsync.py [options] ") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) + +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +host = args[0] +if not "://" in host: + ldaphost = "ldap://%s" % host + ldapshost = "ldaps://%s" % host +else: + ldaphost = host + start = host.rindex("://") + host = host.lstrip(start+3) + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) + +# +# Tests start here +# + +class DirsyncBaseTests(samba.tests.TestCase): + + def setUp(self): + super(DirsyncBaseTests, self).setUp() + self.ldb_admin = ldb + self.base_dn = ldb.domain_dn() + self.domain_sid = security.dom_sid(ldb.get_domain_sid()) + self.user_pass = "samba123@AAA" + self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() + self.sd_utils = sd_utils.SDUtils(ldb) + #used for anonymous login + print "baseDN: %s" % self.base_dn + + def get_user_dn(self, name): + return "CN=%s,CN=Users,%s" % (name, self.base_dn) + + def get_ldb_connection(self, target_username, target_password): + creds_tmp = Credentials() + creds_tmp.set_username(target_username) + creds_tmp.set_password(target_password) + creds_tmp.set_domain(creds.get_domain()) + creds_tmp.set_realm(creds.get_realm()) + creds_tmp.set_workstation(creds.get_workstation()) + creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() + | gensec.FEATURE_SEAL) + ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) + return ldb_target + + +#tests on ldap add operations +class SimpleDirsyncTests(DirsyncBaseTests): + + def setUp(self): + super(SimpleDirsyncTests, self).setUp() + # Regular user + self.dirsync_user = "test_dirsync_user" + self.simple_user = "test_simple_user" + self.admin_user = "test_admin_user" + self.ouname = None + + self.ldb_admin.newuser(self.dirsync_user, self.user_pass) + self.ldb_admin.newuser(self.simple_user, self.user_pass) + self.ldb_admin.newuser(self.admin_user, self.user_pass) + self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn) + + user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user)) + mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid) + self.sd_utils.dacl_add_ace(self.base_dn, mod) + + # add admins to the Domain Admins group + self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user, + add_members_operation=True) + + def tearDown(self): + super(SimpleDirsyncTests, self).tearDown() + delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user)) + delete_force(self.ldb_admin, self.get_user_dn(self.simple_user)) + delete_force(self.ldb_admin, self.get_user_dn(self.admin_user)) + if self.ouname: + delete_force(self.ldb_admin, self.ouname) + self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl) + try: + self.ldb_admin.deletegroup("testgroup") + except: + pass + + #def test_dirsync_errors(self): + + + def test_dirsync_supported(self): + """Test the basic of the dirsync is supported""" + self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass) + self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass) + res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) + res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"]) + try: + self.ldb_simple.search(self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except LdbError,l: + self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) + + def test_parentGUID_referrals(self): + res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"]) + + res = self.ldb_admin.search(self.base_dn, + expression="name=Configuration", + controls=["dirsync:1:0:1"]) + self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID")) + + def test_ok_not_rootdc(self): + """Test if it's ok to do dirsync on another NC that is not the root DC""" + try: + res = self.ldb_admin.search("CN=Configuration, %s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except: + self.assertTrue(False) + + def test_dirsync_errors(self): + """Test if dirsync returns the correct LDAP errors in case of pb""" + self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass) + self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass) + try: + self.ldb_simple.search(self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) + + try: + self.ldb_simple.search("CN=Users,%s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) + + try: + self.ldb_simple.search("CN=Users,%s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:1:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1) + + try: + self.ldb_dirsync.search("CN=Users,%s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) + + try: + self.ldb_admin.search("CN=Users,%s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1) + + try: + self.ldb_admin.search("CN=Users,%s" % self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:1:1"]) + except LdbError,l: + print l + self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1) + + + + + def test_dirsync_attributes(self): + """Check behavior with some attributes """ + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=*", + controls=["dirsync:1:0:1"]) + # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync + self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") != None) + # Check that non replicated attributes are not returned + self.assertTrue(res.msgs[0].get("badPwdCount") == None) + # Check that non forward link are not returned + self.assertTrue(res.msgs[0].get("memberof") == None) + + # Asking for instanceType will return also objectGUID + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + attrs=["instanceType"], + controls=["dirsync:1:0:1"]) + self.assertTrue(res.msgs[0].get("objectGUID") != None) + self.assertTrue(res.msgs[0].get("instanceType") != None) + + # We don't return an entry if asked for objectGUID + res = self.ldb_admin.search(self.base_dn, + expression="dn=%s" % self.base_dn, + attrs=["objectGUID"], + controls=["dirsync:1:0:1"]) + self.assertEquals(len(res.msgs), 0) + + # a request on the root of a NC didn't return parentGUID + res = self.ldb_admin.search(self.base_dn, + expression="dn=%s" % self.base_dn, + attrs=["name"], + controls=["dirsync:1:0:1"]) + self.assertTrue(res.msgs[0].get("objectGUID") != None) + self.assertTrue(res.msgs[0].get("name") != None) + self.assertTrue(res.msgs[0].get("parentGUID") == None) + self.assertTrue(res.msgs[0].get("instanceType") != None) + + # Asking for name will return also objectGUID and parentGUID + # and instanceType and of course name + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + attrs=["name"], + controls=["dirsync:1:0:1"]) + self.assertTrue(res.msgs[0].get("objectGUID") != None) + self.assertTrue(res.msgs[0].get("name") != None) + self.assertTrue(res.msgs[0].get("parentGUID") != None) + self.assertTrue(res.msgs[0].get("instanceType") != None) + + # Asking for dn will not return not only DN but more like if attrs=* + # parentGUID should be returned + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + attrs=["dn"], + controls=["dirsync:1:0:1"]) + count = len(res.msgs[0]) + res2 = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + controls=["dirsync:1:0:1"]) + count2 = len(res2.msgs[0]) + self.assertEqual(count, count2) + + # Asking for cn will return nothing on objects that have CN as RDN + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + attrs=["cn"], + controls=["dirsync:1:0:1"]) + self.assertEqual(len(res.msgs), 0) + # Asking for parentGUID will return nothing too + res = self.ldb_admin.search(self.base_dn, + expression="samaccountname=Administrator", + attrs=["parentGUID"], + controls=["dirsync:1:0:1"]) + self.assertEqual(len(res.msgs), 0) + ouname="OU=testou,%s" % self.base_dn + self.ouname = ouname + self.ldb_admin.create_ou(ouname) + delta = Message() + delta.dn = Dn(self.ldb_admin, str(ouname)) + delta["cn"] = MessageElement("test ou", + FLAG_MOD_ADD, + "cn" ) + self.ldb_admin.modify(delta) + res = self.ldb_admin.search(self.base_dn, + expression="name=testou", + attrs=["cn"], + controls=["dirsync:1:0:1"]) + + self.assertEqual(len(res.msgs), 1) + self.assertEqual(len(res.msgs[0]), 3) + delete_force(self.ldb_admin, ouname) + + def test_dirsync_with_controls(self): + """Check that dirsync return correct informations when dealing with the NC""" + res = self.ldb_admin.search(self.base_dn, + expression="(dn=%s)" % str(self.base_dn), + attrs=["name"], + controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"]) + + def test_dirsync_basenc(self): + """Check that dirsync return correct informations when dealing with the NC""" + res = self.ldb_admin.search(self.base_dn, + expression="(dn=%s)" % str(self.base_dn), + attrs=["name"], + controls=["dirsync:1:0:10000"]) + self.assertEqual(len(res.msgs), 1) + self.assertEqual(len(res.msgs[0]), 3) + + res = self.ldb_admin.search(self.base_dn, + expression="(dn=%s)" % str(self.base_dn), + attrs=["ntSecurityDescriptor"], + controls=["dirsync:1:0:10000"]) + self.assertEqual(len(res.msgs), 1) + self.assertEqual(len(res.msgs[0]), 3) + + def test_dirsync_othernc(self): + """Check that dirsync return information for entries that are normaly referrals (ie. other NCs)""" + res = self.ldb_admin.search(self.base_dn, + expression="(objectclass=configuration)", + attrs=["name"], + controls=["dirsync:1:0:10000"]) + self.assertEqual(len(res.msgs), 1) + self.assertEqual(len(res.msgs[0]), 4) + + res = self.ldb_admin.search(self.base_dn, + expression="(objectclass=configuration)", + attrs=["ntSecurityDescriptor"], + controls=["dirsync:1:0:10000"]) + self.assertEqual(len(res.msgs), 1) + self.assertEqual(len(res.msgs[0]), 3) + + res = self.ldb_admin.search(self.base_dn, + expression="(objectclass=domaindns)", + attrs=["ntSecurityDescriptor"], + controls=["dirsync:1:0:10000"]) + nb = len(res.msgs) + + # only sub nc returns a result when asked for objectGUID + res = self.ldb_admin.search(self.base_dn, + expression="(objectclass=domaindns)", + attrs=["objectGUID"], + controls=["dirsync:1:0:0"]) + self.assertEqual(len(res.msgs), nb - 1) + if nb > 1: + self.assertTrue(res.msgs[0].get("objectGUID") != None) + else: + res = self.ldb_admin.search(self.base_dn, + expression="(objectclass=configuration)", + attrs=["objectGUID"], + controls=["dirsync:1:0:0"]) + + + def test_dirsync_send_delta(self): + """Check that dirsync return correct delta when sending the last cookie""" + res = self.ldb_admin.search(self.base_dn, + expression="(&(samaccountname=test*)(!(isDeleted=*)))", + controls=["dirsync:1:0:10000"]) + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "10000" + control = str(":".join(ctl)) + res = self.ldb_admin.search(self.base_dn, + expression="(&(samaccountname=test*)(!(isDeleted=*)))", + controls=[control]) + self.assertEqual(len(res), 0) + + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=["dirsync:1:0:100000"]) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "10000" + control2 = str(":".join(ctl)) + + # Let's create an OU + ouname="OU=testou2,%s" % self.base_dn + self.ouname = ouname + self.ldb_admin.create_ou(ouname) + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=[control2]) + self.assertEqual(len(res), 1) + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "10000" + control3 = str(":".join(ctl)) + + delta = Message() + delta.dn = Dn(self.ldb_admin, str(ouname)) + + delta["cn"] = MessageElement("test ou", + FLAG_MOD_ADD, + "cn" ) + self.ldb_admin.modify(delta) + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=[control3]) + + self.assertEqual(len(res.msgs), 1) + # 3 attributes: instanceType, cn and objectGUID + self.assertEqual(len(res.msgs[0]), 3) + + delta = Message() + delta.dn = Dn(self.ldb_admin, str(ouname)) + delta["cn"] = MessageElement([], + FLAG_MOD_DELETE, + "cn" ) + self.ldb_admin.modify(delta) + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=[control3]) + + self.assertEqual(len(res.msgs), 1) + # So we won't have much attribute returned but instanceType and GUID + # are. + # 3 attributes: instanceType and objectGUID and cn but empty + self.assertEqual(len(res.msgs[0]), 3) + ouname = "OU=newouname,%s" % self.base_dn + self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname))) + self.ouname = ouname + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "10000" + control4 = str(":".join(ctl)) + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=[control3]) + + self.assertTrue(res[0].get("parentGUID") != None) + self.assertTrue(res[0].get("name") != None) + delete_force(self.ldb_admin, ouname) + + def test_dirsync_linkedattributes(self): + """Check that dirsync returnd deleted objects too""" + # Let's search for members + self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass) + res = self.ldb_simple.search(self.base_dn, + expression="(name=Administrators)", + controls=["dirsync:1:1:1"]) + + self.assertTrue(len(res[0].get("member")) > 0) + size = len(res[0].get("member")) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "1" + ctl[3] = "10000" + control1 = str(":".join(ctl)) + self.ldb_admin.add_remove_group_members("Administrators", self.simple_user, + add_members_operation=True) + + res = self.ldb_simple.search(self.base_dn, + expression="(name=Administrators)", + controls=[control1]) + + self.assertEqual(len(res[0].get("member")), size + 1) + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "1" + ctl[3] = "10000" + control1 = str(":".join(ctl)) + + # remove the user from the group + self.ldb_admin.add_remove_group_members("Administrators", self.simple_user, + add_members_operation=False) + + res = self.ldb_simple.search(self.base_dn, + expression="(name=Administrators)", + controls=[control1]) + + self.assertEqual(len(res[0].get("member")), size ) + + self.ldb_admin.newgroup("testgroup") + self.ldb_admin.add_remove_group_members("testgroup", self.simple_user, + add_members_operation=True) + + res = self.ldb_admin.search(self.base_dn, + expression="(name=testgroup)", + controls=["dirsync:1:0:1"]) + + self.assertEqual(len(res[0].get("member")), 1) + self.assertTrue(res[0].get("member") != "" ) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "1" + control1 = str(":".join(ctl)) + + # Check that reasking the same question but with an updated cookie + # didn't return any results. + print control1 + res = self.ldb_admin.search(self.base_dn, + expression="(name=testgroup)", + controls=[control1]) + self.assertEqual(len(res), 0) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "1" + ctl[3] = "10000" + control1 = str(":".join(ctl)) + + self.ldb_admin.add_remove_group_members("testgroup", self.simple_user, + add_members_operation=False) + + res = self.ldb_admin.search(self.base_dn, + expression="(name=testgroup)", + attrs=["member"], + controls=[control1]) + + self.ldb_admin.deletegroup("testgroup") + self.assertEqual(len(res[0].get("member")), 0) + + + + def test_dirsync_deleted_items(self): + """Check that dirsync returnd deleted objects too""" + # Let's create an OU + ouname="OU=testou3,%s" % self.base_dn + self.ouname = ouname + self.ldb_admin.create_ou(ouname) + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=["dirsync:1:0:1"]) + guid = None + for e in res: + if str(e["name"]) == "testou3": + guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0])) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "0" + ctl[3] = "10000" + control1 = str(":".join(ctl)) + + # So now delete the object and check that + # we can see the object but deleted when admin + delete_force(self.ldb_admin, ouname) + + res = self.ldb_admin.search(self.base_dn, + expression="(objectClass=organizationalUnit)", + controls=[control1]) + self.assertEqual(len(res), 1) + guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0])) + self.assertEqual(guid2, guid) + self.assertTrue(res[0].get("isDeleted")) + self.assertTrue(res[0].get("name") != None) + + def test_cookie_from_others(self): + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=["dirsync:1:0:1"]) + ctl = str(res.controls[0]).split(":") + cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4]))) + cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db") + controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie))] + res = self.ldb_admin.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=controls) + +class ExtendedDirsyncTests(SimpleDirsyncTests): + def test_dirsync_linkedattributes(self): + flag_incr_linked = 2147483648 + self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass) + res = self.ldb_admin.search(self.base_dn, + attrs=["member"], + expression="(name=Administrators)", + controls=["dirsync:1:%d:1" % flag_incr_linked]) + + self.assertTrue(res[0].get("member;range=1-1") != None ) + self.assertTrue(len(res[0].get("member;range=1-1")) > 0) + size = len(res[0].get("member;range=1-1")) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "%d" % flag_incr_linked + ctl[3] = "10000" + control1 = str(":".join(ctl)) + self.ldb_admin.add_remove_group_members("Administrators", self.simple_user, + add_members_operation=True) + self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user, + add_members_operation=True) + + + res = self.ldb_admin.search(self.base_dn, + expression="(name=Administrators)", + controls=[control1]) + + self.assertEqual(len(res[0].get("member;range=1-1")), 2) + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "%d" % flag_incr_linked + ctl[3] = "10000" + control1 = str(":".join(ctl)) + + # remove the user from the group + self.ldb_admin.add_remove_group_members("Administrators", self.simple_user, + add_members_operation=False) + + res = self.ldb_admin.search(self.base_dn, + expression="(name=Administrators)", + controls=[control1]) + + self.assertEqual(res[0].get("member;range=1-1"), None ) + self.assertEqual(len(res[0].get("member;range=0-0")), 1) + + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "%d" % flag_incr_linked + ctl[3] = "10000" + control2 = str(":".join(ctl)) + + self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user, + add_members_operation=False) + + res = self.ldb_admin.search(self.base_dn, + expression="(name=Administrators)", + controls=[control2]) + + self.assertEqual(res[0].get("member;range=1-1"), None ) + self.assertEqual(len(res[0].get("member;range=0-0")), 1) + + res = self.ldb_admin.search(self.base_dn, + expression="(name=Administrators)", + controls=[control1]) + + self.assertEqual(res[0].get("member;range=1-1"), None ) + self.assertEqual(len(res[0].get("member;range=0-0")), 2) + + def test_dirsync_deleted_items(self): + """Check that dirsync returnd deleted objects too""" + # Let's create an OU + self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass) + ouname="OU=testou3,%s" % self.base_dn + self.ouname = ouname + self.ldb_admin.create_ou(ouname) + + # Specify LDAP_DIRSYNC_OBJECT_SECURITY + res = self.ldb_simple.search(self.base_dn, + expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))", + controls=["dirsync:1:1:1"]) + + guid = None + for e in res: + if str(e["name"]) == "testou3": + guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0])) + + self.assertTrue(guid != None) + ctl = str(res.controls[0]).split(":") + ctl[1] = "1" + ctl[2] = "1" + ctl[3] = "10000" + control1 = str(":".join(ctl)) + + # So now delete the object and check that + # we can see the object but deleted when admin + # we just see the objectGUID when simple user + delete_force(self.ldb_admin, ouname) + + res = self.ldb_simple.search(self.base_dn, + expression="(objectClass=organizationalUnit)", + controls=[control1]) + self.assertEqual(len(res), 1) + guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0])) + self.assertEqual(guid2, guid) + self.assertEqual(str(res[0].dn), "") + + +ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp) + +runner = SubunitTestRunner() +rc = 0 +# +if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful(): + rc = 1 +if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful(): + rc = 1 + +sys.exit(rc) diff --git a/source4/selftest/knownfail b/source4/selftest/knownfail index d3c3f4ee6da..52e56ea5949 100644 --- a/source4/selftest/knownfail +++ b/source4/selftest/knownfail @@ -83,3 +83,5 @@ samba4.smb2.compound.*.invalid2 samba4.ldap.acl.*.search.* # ACL search behaviour not enabled by default samba4.ldap.acl.*.ntSecurityDescriptor.* # ACL extended checks on search not enabled by default samba4.nbt.winsreplication.owned # fails sometimes, timing related +samba4.ldap.dirsync.python.dc..__main__.ExtendedDirsyncTests.test_dirsync_deleted_items +#samba4.ldap.dirsync.python.dc..__main__.ExtendedDirsyncTests.* diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index eb36a0c2b12..54c2b222e1e 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -388,6 +388,7 @@ plantestsuite("samba4.tokengroups.python(dc)", "dc:local", [python, os.path.join plantestsuite("samba4.sam.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sam.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN']) plansambapythontestsuite("samba4.schemaInfo.python(dc)", "dc", os.path.join(samba4srcdir, 'dsdb/tests/python'), 'dsdb_schema_info', extra_args=['-U"$DOMAIN/$DC_USERNAME%$DC_PASSWORD"']) plantestsuite("samba4.urgent_replication.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/urgent_replication.py"), '$PREFIX_ABS/dc/private/sam.ldb'], allow_empty_output=True) +plantestsuite("samba4.ldap.dirsync.python(dc)", "dc", [python, os.path.join(samba4srcdir, "dsdb/tests/python/dirsync.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN']) for env in ["dc", "fl2000dc", "fl2003dc", "fl2008r2dc"]: plantestsuite("samba4.ldap_schema.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap_schema.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '-W', '$DOMAIN']) plantestsuite("samba4.ldap.possibleInferiors.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/samdb/ldb_modules/tests/possibleinferiors.py"), "ldap://$SERVER", '-U"$USERNAME%$PASSWORD"', "-W", "$DOMAIN"])