2018-12-12 13:40:43 +13:00
#!/usr/bin/env python3
2011-02-27 12:24:45 +03:00
#
# Unit tests for dirsync control
# Copyright (C) Matthieu Patou <mat@matws.net> 2011
2014-11-01 21:17:39 -07:00
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
2011-02-27 12:24:45 +03:00
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import optparse
import sys
sys . path . insert ( 0 , " bin/python " )
import samba
2014-11-01 21:17:39 -07:00
from samba . tests . subunitrun import TestProgram , SubunitOptions
2011-02-27 12:24:45 +03:00
import samba . getopt as options
import base64
2019-10-15 16:28:46 +13:00
import ldb
2011-02-27 12:24:45 +03:00
from ldb import LdbError , SCOPE_BASE
from ldb import Message , MessageElement , Dn
from ldb import FLAG_MOD_ADD , FLAG_MOD_DELETE
2018-10-11 17:07:05 +13:00
from samba . dcerpc import security , misc , drsblobs
2011-02-27 12:24:45 +03:00
from samba . ndr import ndr_unpack , ndr_pack
from samba . auth import system_session
from samba import gensec , sd_utils
from samba . samdb import SamDB
2012-02-19 21:24:59 +11:00
from samba . credentials import Credentials , DONT_USE_KERBEROS
2011-02-27 12:24:45 +03:00
import samba . tests
from samba . tests import delete_force
parser = optparse . OptionParser ( " dirsync.py [options] <host> " )
sambaopts = options . SambaOptions ( parser )
parser . add_option_group ( sambaopts )
parser . add_option_group ( options . VersionOptions ( parser ) )
# use command line creds if available
credopts = options . CredentialsOptions ( parser )
parser . add_option_group ( credopts )
2014-11-01 21:17:39 -07:00
subunitopts = SubunitOptions ( parser )
parser . add_option_group ( subunitopts )
2011-02-27 12:24:45 +03:00
opts , args = parser . parse_args ( )
if len ( args ) < 1 :
parser . print_usage ( )
sys . exit ( 1 )
2014-11-01 20:51:46 -07:00
host = args . pop ( )
2018-07-30 18:22:34 +12:00
if " :// " not in host :
2011-02-27 12:24:45 +03:00
ldaphost = " ldap:// %s " % host
ldapshost = " ldaps:// %s " % host
else :
ldaphost = host
start = host . rindex ( " :// " )
2018-07-30 18:18:25 +12:00
host = host . lstrip ( start + 3 )
2011-02-27 12:24:45 +03:00
lp = sambaopts . get_loadparm ( )
creds = credopts . get_credentials ( lp )
#
# Tests start here
#
2018-07-30 18:20:39 +12:00
2011-02-27 12:24:45 +03:00
class DirsyncBaseTests ( samba . tests . TestCase ) :
def setUp ( self ) :
super ( DirsyncBaseTests , self ) . setUp ( )
2014-11-01 20:27:30 -07:00
self . ldb_admin = SamDB ( ldapshost , credentials = creds , session_info = system_session ( lp ) , lp = lp )
self . base_dn = self . ldb_admin . domain_dn ( )
self . domain_sid = security . dom_sid ( self . ldb_admin . get_domain_sid ( ) )
2017-08-04 15:21:31 +12:00
self . user_pass = samba . generate_random_password ( 12 , 16 )
2011-02-27 12:24:45 +03:00
self . configuration_dn = self . ldb_admin . get_config_basedn ( ) . get_linearized ( )
2014-11-01 20:27:30 -07:00
self . sd_utils = sd_utils . SDUtils ( self . ldb_admin )
2018-07-30 18:19:49 +12:00
# used for anonymous login
2018-03-09 13:57:01 +00:00
print ( " baseDN: %s " % self . base_dn )
2011-02-27 12:24:45 +03:00
def get_user_dn ( self , name ) :
return " CN= %s ,CN=Users, %s " % ( name , self . base_dn )
def get_ldb_connection ( self , target_username , target_password ) :
creds_tmp = Credentials ( )
creds_tmp . set_username ( target_username )
creds_tmp . set_password ( target_password )
creds_tmp . set_domain ( creds . get_domain ( ) )
creds_tmp . set_realm ( creds . get_realm ( ) )
creds_tmp . set_workstation ( creds . get_workstation ( ) )
creds_tmp . set_gensec_features ( creds_tmp . get_gensec_features ( )
| gensec . FEATURE_SEAL )
2018-07-30 18:19:33 +12:00
creds_tmp . set_kerberos_state ( DONT_USE_KERBEROS ) # kinit is too expensive to use in a tight loop
2011-02-27 12:24:45 +03:00
ldb_target = SamDB ( url = ldaphost , credentials = creds_tmp , lp = lp )
return ldb_target
2018-07-30 18:19:49 +12:00
# tests on ldap add operations
2011-02-27 12:24:45 +03:00
class SimpleDirsyncTests ( DirsyncBaseTests ) :
def setUp ( self ) :
super ( SimpleDirsyncTests , self ) . setUp ( )
# Regular user
self . dirsync_user = " test_dirsync_user "
self . simple_user = " test_simple_user "
self . admin_user = " test_admin_user "
self . ouname = None
self . ldb_admin . newuser ( self . dirsync_user , self . user_pass )
self . ldb_admin . newuser ( self . simple_user , self . user_pass )
self . ldb_admin . newuser ( self . admin_user , self . user_pass )
self . desc_sddl = self . sd_utils . get_sd_as_sddl ( self . base_dn )
user_sid = self . sd_utils . get_object_sid ( self . get_user_dn ( self . dirsync_user ) )
2017-08-09 13:56:07 +12:00
mod = " (OA;;CR; %s ;; %s ) " % ( security . GUID_DRS_GET_CHANGES ,
str ( user_sid ) )
2011-02-27 12:24:45 +03:00
self . sd_utils . dacl_add_ace ( self . base_dn , mod )
# add admins to the Domain Admins group
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Domain Admins " , [ self . admin_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = True )
2011-02-27 12:24:45 +03:00
def tearDown ( self ) :
super ( SimpleDirsyncTests , self ) . tearDown ( )
delete_force ( self . ldb_admin , self . get_user_dn ( self . dirsync_user ) )
delete_force ( self . ldb_admin , self . get_user_dn ( self . simple_user ) )
delete_force ( self . ldb_admin , self . get_user_dn ( self . admin_user ) )
if self . ouname :
delete_force ( self . ldb_admin , self . ouname )
self . sd_utils . modify_sd_on_dn ( self . base_dn , self . desc_sddl )
try :
self . ldb_admin . deletegroup ( " testgroup " )
2012-01-24 11:54:54 +11:00
except Exception :
2011-02-27 12:24:45 +03:00
pass
2018-07-30 18:19:49 +12:00
# def test_dirsync_errors(self):
2011-02-27 12:24:45 +03:00
def test_dirsync_supported ( self ) :
""" Test the basic of the dirsync is supported """
self . ldb_dirsync = self . get_ldb_connection ( self . dirsync_user , self . user_pass )
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
res = self . ldb_admin . search ( self . base_dn , expression = " samaccountname=* " , controls = [ " dirsync:1:0:1 " ] )
res = self . ldb_dirsync . search ( self . base_dn , expression = " samaccountname=* " , controls = [ " dirsync:1:0:1 " ] )
try :
self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-07-30 18:13:57 +12:00
self . assertTrue ( str ( l ) . find ( " LDAP_INSUFFICIENT_ACCESS_RIGHTS " ) != - 1 )
2011-02-27 12:24:45 +03:00
def test_parentGUID_referrals ( self ) :
res2 = self . ldb_admin . search ( self . base_dn , scope = SCOPE_BASE , attrs = [ " objectGUID " ] )
res = self . ldb_admin . search ( self . base_dn ,
2018-07-30 18:15:34 +12:00
expression = " name=Configuration " ,
controls = [ " dirsync:1:0:1 " ] )
2011-02-27 12:24:45 +03:00
self . assertEqual ( res2 [ 0 ] . get ( " objectGUID " ) , res [ 0 ] . get ( " parentGUID " ) )
def test_ok_not_rootdc ( self ) :
""" Test if it ' s ok to do dirsync on another NC that is not the root DC """
2011-10-08 14:34:57 +02:00
self . ldb_admin . search ( self . ldb_admin . get_config_basedn ( ) ,
2018-07-30 18:15:34 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2011-02-27 12:24:45 +03:00
def test_dirsync_errors ( self ) :
""" Test if dirsync returns the correct LDAP errors in case of pb """
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
self . ldb_dirsync = self . get_ldb_connection ( self . dirsync_user , self . user_pass )
try :
self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_INSUFFICIENT_ACCESS_RIGHTS " ) != - 1 )
try :
self . ldb_simple . search ( " CN=Users, %s " % self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_INSUFFICIENT_ACCESS_RIGHTS " ) != - 1 )
try :
self . ldb_simple . search ( " CN=Users, %s " % self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:1:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_UNWILLING_TO_PERFORM " ) != - 1 )
try :
self . ldb_dirsync . search ( " CN=Users, %s " % self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_INSUFFICIENT_ACCESS_RIGHTS " ) != - 1 )
try :
self . ldb_admin . search ( " CN=Users, %s " % self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_INSUFFICIENT_ACCESS_RIGHTS " ) != - 1 )
try :
self . ldb_admin . search ( " CN=Users, %s " % self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=* " ,
controls = [ " dirsync:1:1:1 " ] )
2018-02-14 10:31:33 +13:00
except LdbError as l :
2018-03-09 13:57:01 +00:00
print ( l )
2011-02-27 12:24:45 +03:00
self . assertTrue ( str ( l ) . find ( " LDAP_UNWILLING_TO_PERFORM " ) != - 1 )
def test_dirsync_attributes ( self ) :
""" Check behavior with some attributes """
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=* " ,
controls = [ " dirsync:1:0:1 " ] )
# Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " ntsecuritydescriptor " ) is not None )
2011-02-27 12:24:45 +03:00
# Check that non replicated attributes are not returned
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " badPwdCount " ) is None )
2011-02-27 12:24:45 +03:00
# Check that non forward link are not returned
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " memberof " ) is None )
2011-02-27 12:24:45 +03:00
# Asking for instanceType will return also objectGUID
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=Administrator " ,
attrs = [ " instanceType " ] ,
controls = [ " dirsync:1:0:1 " ] )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " objectGUID " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " instanceType " ) is not None )
2011-02-27 12:24:45 +03:00
# We don't return an entry if asked for objectGUID
res = self . ldb_admin . search ( self . base_dn ,
2011-10-25 20:10:30 +02:00
expression = " (distinguishedName= %s ) " % str ( self . base_dn ) ,
2011-02-27 12:24:45 +03:00
attrs = [ " objectGUID " ] ,
controls = [ " dirsync:1:0:1 " ] )
2020-02-07 11:02:38 +13:00
self . assertEqual ( len ( res . msgs ) , 0 )
2011-02-27 12:24:45 +03:00
# a request on the root of a NC didn't return parentGUID
res = self . ldb_admin . search ( self . base_dn ,
2011-10-25 20:10:30 +02:00
expression = " (distinguishedName= %s ) " % str ( self . base_dn ) ,
2011-02-27 12:24:45 +03:00
attrs = [ " name " ] ,
controls = [ " dirsync:1:0:1 " ] )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " objectGUID " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " name " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " parentGUID " ) is None )
self . assertTrue ( res . msgs [ 0 ] . get ( " instanceType " ) is not None )
2011-02-27 12:24:45 +03:00
2018-07-30 18:14:03 +12:00
# Asking for name will return also objectGUID and parentGUID
2011-02-27 12:24:45 +03:00
# and instanceType and of course name
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=Administrator " ,
attrs = [ " name " ] ,
controls = [ " dirsync:1:0:1 " ] )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " objectGUID " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " name " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " parentGUID " ) is not None )
self . assertTrue ( res . msgs [ 0 ] . get ( " instanceType " ) is not None )
2011-02-27 12:24:45 +03:00
# Asking for dn will not return not only DN but more like if attrs=*
# parentGUID should be returned
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=Administrator " ,
attrs = [ " dn " ] ,
controls = [ " dirsync:1:0:1 " ] )
count = len ( res . msgs [ 0 ] )
res2 = self . ldb_admin . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " samaccountname=Administrator " ,
controls = [ " dirsync:1:0:1 " ] )
2011-02-27 12:24:45 +03:00
count2 = len ( res2 . msgs [ 0 ] )
self . assertEqual ( count , count2 )
# Asking for cn will return nothing on objects that have CN as RDN
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=Administrator " ,
attrs = [ " cn " ] ,
controls = [ " dirsync:1:0:1 " ] )
self . assertEqual ( len ( res . msgs ) , 0 )
# Asking for parentGUID will return nothing too
res = self . ldb_admin . search ( self . base_dn ,
expression = " samaccountname=Administrator " ,
attrs = [ " parentGUID " ] ,
controls = [ " dirsync:1:0:1 " ] )
self . assertEqual ( len ( res . msgs ) , 0 )
2018-07-30 18:18:03 +12:00
ouname = " OU=testou, %s " % self . base_dn
2011-02-27 12:24:45 +03:00
self . ouname = ouname
self . ldb_admin . create_ou ( ouname )
delta = Message ( )
delta . dn = Dn ( self . ldb_admin , str ( ouname ) )
delta [ " cn " ] = MessageElement ( " test ou " ,
2018-07-30 18:15:34 +12:00
FLAG_MOD_ADD ,
2018-07-30 18:17:02 +12:00
" cn " )
2011-02-27 12:24:45 +03:00
self . ldb_admin . modify ( delta )
res = self . ldb_admin . search ( self . base_dn ,
expression = " name=testou " ,
attrs = [ " cn " ] ,
controls = [ " dirsync:1:0:1 " ] )
self . assertEqual ( len ( res . msgs ) , 1 )
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
delete_force ( self . ldb_admin , ouname )
def test_dirsync_with_controls ( self ) :
2019-03-25 15:02:45 +01:00
""" Check that dirsync return correct information when dealing with the NC """
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
2011-10-25 20:10:30 +02:00
expression = " (distinguishedName= %s ) " % str ( self . base_dn ) ,
2011-02-27 12:24:45 +03:00
attrs = [ " name " ] ,
controls = [ " dirsync:1:0:10000 " , " extended_dn:1 " , " show_deleted:1 " ] )
def test_dirsync_basenc ( self ) :
2019-03-25 15:02:45 +01:00
""" Check that dirsync return correct information when dealing with the NC """
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
2011-10-25 20:10:30 +02:00
expression = " (distinguishedName= %s ) " % str ( self . base_dn ) ,
2011-02-27 12:24:45 +03:00
attrs = [ " name " ] ,
controls = [ " dirsync:1:0:10000 " ] )
self . assertEqual ( len ( res . msgs ) , 1 )
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
res = self . ldb_admin . search ( self . base_dn ,
2011-10-25 20:10:30 +02:00
expression = " (distinguishedName= %s ) " % str ( self . base_dn ) ,
2011-02-27 12:24:45 +03:00
attrs = [ " ntSecurityDescriptor " ] ,
controls = [ " dirsync:1:0:10000 " ] )
self . assertEqual ( len ( res . msgs ) , 1 )
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
def test_dirsync_othernc ( self ) :
2023-08-02 10:44:32 +02:00
""" Check that dirsync return information for entries that are normally referrals (ie. other NCs) """
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectclass=configuration) " ,
attrs = [ " name " ] ,
controls = [ " dirsync:1:0:10000 " ] )
self . assertEqual ( len ( res . msgs ) , 1 )
self . assertEqual ( len ( res . msgs [ 0 ] ) , 4 )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectclass=configuration) " ,
attrs = [ " ntSecurityDescriptor " ] ,
controls = [ " dirsync:1:0:10000 " ] )
self . assertEqual ( len ( res . msgs ) , 1 )
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectclass=domaindns) " ,
attrs = [ " ntSecurityDescriptor " ] ,
controls = [ " dirsync:1:0:10000 " ] )
nb = len ( res . msgs )
# only sub nc returns a result when asked for objectGUID
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectclass=domaindns) " ,
attrs = [ " objectGUID " ] ,
controls = [ " dirsync:1:0:0 " ] )
self . assertEqual ( len ( res . msgs ) , nb - 1 )
if nb > 1 :
2018-07-30 18:22:15 +12:00
self . assertTrue ( res . msgs [ 0 ] . get ( " objectGUID " ) is not None )
2011-02-27 12:24:45 +03:00
else :
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectclass=configuration) " ,
attrs = [ " objectGUID " ] ,
controls = [ " dirsync:1:0:0 " ] )
def test_dirsync_send_delta ( self ) :
""" Check that dirsync return correct delta when sending the last cookie """
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(samaccountname=test*)(!(isDeleted=*))) " ,
controls = [ " dirsync:1:0:10000 " ] )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 10000 "
control = str ( " : " . join ( ctl ) )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(samaccountname=test*)(!(isDeleted=*))) " ,
controls = [ control ] )
self . assertEqual ( len ( res ) , 0 )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ " dirsync:1:0:100000 " ] )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 10000 "
control2 = str ( " : " . join ( ctl ) )
# Let's create an OU
2018-07-30 18:18:03 +12:00
ouname = " OU=testou2, %s " % self . base_dn
2011-02-27 12:24:45 +03:00
self . ouname = ouname
self . ldb_admin . create_ou ( ouname )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ control2 ] )
self . assertEqual ( len ( res ) , 1 )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 10000 "
control3 = str ( " : " . join ( ctl ) )
delta = Message ( )
delta . dn = Dn ( self . ldb_admin , str ( ouname ) )
delta [ " cn " ] = MessageElement ( " test ou " ,
2018-07-30 18:15:34 +12:00
FLAG_MOD_ADD ,
2018-07-30 18:17:02 +12:00
" cn " )
2011-02-27 12:24:45 +03:00
self . ldb_admin . modify ( delta )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ control3 ] )
self . assertEqual ( len ( res . msgs ) , 1 )
# 3 attributes: instanceType, cn and objectGUID
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
delta = Message ( )
delta . dn = Dn ( self . ldb_admin , str ( ouname ) )
delta [ " cn " ] = MessageElement ( [ ] ,
2018-07-30 18:15:34 +12:00
FLAG_MOD_DELETE ,
2018-07-30 18:17:02 +12:00
" cn " )
2011-02-27 12:24:45 +03:00
self . ldb_admin . modify ( delta )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ control3 ] )
self . assertEqual ( len ( res . msgs ) , 1 )
# So we won't have much attribute returned but instanceType and GUID
# are.
# 3 attributes: instanceType and objectGUID and cn but empty
self . assertEqual ( len ( res . msgs [ 0 ] ) , 3 )
ouname = " OU=newouname, %s " % self . base_dn
self . ldb_admin . rename ( str ( res [ 0 ] . dn ) , str ( Dn ( self . ldb_admin , ouname ) ) )
self . ouname = ouname
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 10000 "
control4 = str ( " : " . join ( ctl ) )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ control3 ] )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res [ 0 ] . get ( " parentGUID " ) is not None )
self . assertTrue ( res [ 0 ] . get ( " name " ) is not None )
2011-02-27 12:24:45 +03:00
delete_force ( self . ldb_admin , ouname )
def test_dirsync_linkedattributes ( self ) :
2023-08-02 10:44:32 +02:00
""" Check that dirsync returned deleted objects too """
2011-02-27 12:24:45 +03:00
# Let's search for members
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
res = self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:1:1 " ] )
2011-02-27 12:24:45 +03:00
self . assertTrue ( len ( res [ 0 ] . get ( " member " ) ) > 0 )
size = len ( res [ 0 ] . get ( " member " ) )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 1 "
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = True )
2011-02-27 12:24:45 +03:00
res = self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " (name=Administrators) " ,
controls = [ control1 ] )
2011-02-27 12:24:45 +03:00
self . assertEqual ( len ( res [ 0 ] . get ( " member " ) ) , size + 1 )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 1 "
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
# remove the user from the group
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = False )
2011-02-27 12:24:45 +03:00
res = self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " (name=Administrators) " ,
controls = [ control1 ] )
2011-02-27 12:24:45 +03:00
2018-07-30 18:17:02 +12:00
self . assertEqual ( len ( res [ 0 ] . get ( " member " ) ) , size )
2011-02-27 12:24:45 +03:00
self . ldb_admin . newgroup ( " testgroup " )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " testgroup " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = True )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=testgroup) " ,
controls = [ " dirsync:1:0:1 " ] )
self . assertEqual ( len ( res [ 0 ] . get ( " member " ) ) , 1 )
2018-07-30 18:17:02 +12:00
self . assertTrue ( res [ 0 ] . get ( " member " ) != " " )
2011-02-27 12:24:45 +03:00
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 1 "
control1 = str ( " : " . join ( ctl ) )
# Check that reasking the same question but with an updated cookie
# didn't return any results.
2018-03-09 13:57:01 +00:00
print ( control1 )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=testgroup) " ,
controls = [ control1 ] )
self . assertEqual ( len ( res ) , 0 )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 1 "
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " testgroup " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = False )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=testgroup) " ,
attrs = [ " member " ] ,
controls = [ control1 ] )
self . ldb_admin . deletegroup ( " testgroup " )
self . assertEqual ( len ( res [ 0 ] . get ( " member " ) ) , 0 )
def test_dirsync_deleted_items ( self ) :
2023-08-02 10:44:32 +02:00
""" Check that dirsync returned deleted objects too """
2011-02-27 12:24:45 +03:00
# Let's create an OU
2018-07-30 18:18:03 +12:00
ouname = " OU=testou3, %s " % self . base_dn
2011-02-27 12:24:45 +03:00
self . ouname = ouname
self . ldb_admin . create_ou ( ouname )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ " dirsync:1:0:1 " ] )
guid = None
for e in res :
if str ( e [ " name " ] ) == " testou3 " :
2018-07-30 18:19:05 +12:00
guid = str ( ndr_unpack ( misc . GUID , e . get ( " objectGUID " ) [ 0 ] ) )
2011-02-27 12:24:45 +03:00
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 0 "
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
# So now delete the object and check that
# we can see the object but deleted when admin
delete_force ( self . ldb_admin , ouname )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (objectClass=organizationalUnit) " ,
controls = [ control1 ] )
self . assertEqual ( len ( res ) , 1 )
2018-07-30 18:19:05 +12:00
guid2 = str ( ndr_unpack ( misc . GUID , res [ 0 ] . get ( " objectGUID " ) [ 0 ] ) )
2011-02-27 12:24:45 +03:00
self . assertEqual ( guid2 , guid )
self . assertTrue ( res [ 0 ] . get ( " isDeleted " ) )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res [ 0 ] . get ( " name " ) is not None )
2011-02-27 12:24:45 +03:00
def test_cookie_from_others ( self ) :
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ " dirsync:1:0:1 " ] )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
cookie = ndr_unpack ( drsblobs . ldapControlDirSyncCookie , base64 . b64decode ( str ( ctl [ 4 ] ) ) )
cookie . blob . guid1 = misc . GUID ( " 128a99bf-abcd-1234-abcd-1fb625e530db " )
2018-07-30 18:18:03 +12:00
controls = [ " dirsync:1:0:0: %s " % base64 . b64encode ( ndr_pack ( cookie ) ) . decode ( ' utf8 ' ) ]
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = controls )
2014-11-01 20:27:30 -07:00
2011-02-27 12:24:45 +03:00
class ExtendedDirsyncTests ( SimpleDirsyncTests ) :
2014-11-01 20:27:30 -07:00
2019-10-15 16:28:46 +13:00
def test_dirsync_linkedattributes_range ( self ) :
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
res = self . ldb_admin . search ( self . base_dn ,
attrs = [ " member;range=1-1 " ] ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:0:0 " ] )
self . assertTrue ( len ( res ) > 0 )
self . assertTrue ( res [ 0 ] . get ( " member;range=1-1 " ) is None )
self . assertTrue ( res [ 0 ] . get ( " member " ) is not None )
self . assertTrue ( len ( res [ 0 ] . get ( " member " ) ) > 0 )
def test_dirsync_linkedattributes_range_user ( self ) :
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
try :
res = self . ldb_simple . search ( self . base_dn ,
attrs = [ " member;range=1-1 " ] ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:0:0 " ] )
except LdbError as e :
( num , _ ) = e . args
2020-02-07 11:02:38 +13:00
self . assertEqual ( num , ldb . ERR_INSUFFICIENT_ACCESS_RIGHTS )
2019-10-15 16:28:46 +13:00
else :
self . fail ( )
2011-02-27 12:24:45 +03:00
def test_dirsync_linkedattributes ( self ) :
flag_incr_linked = 2147483648
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
res = self . ldb_admin . search ( self . base_dn ,
attrs = [ " member " ] ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1: %d :1 " % flag_incr_linked ] )
2018-07-30 18:22:15 +12:00
self . assertTrue ( res [ 0 ] . get ( " member;range=1-1 " ) is not None )
2011-02-27 12:24:45 +03:00
self . assertTrue ( len ( res [ 0 ] . get ( " member;range=1-1 " ) ) > 0 )
size = len ( res [ 0 ] . get ( " member;range=1-1 " ) )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " %d " % flag_incr_linked
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = True )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . dirsync_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = True )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ control1 ] )
self . assertEqual ( len ( res [ 0 ] . get ( " member;range=1-1 " ) ) , 2 )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " %d " % flag_incr_linked
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
# remove the user from the group
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . simple_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = False )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ control1 ] )
2018-07-30 18:17:02 +12:00
self . assertEqual ( res [ 0 ] . get ( " member;range=1-1 " ) , None )
2011-02-27 12:24:45 +03:00
self . assertEqual ( len ( res [ 0 ] . get ( " member;range=0-0 " ) ) , 1 )
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " %d " % flag_incr_linked
ctl [ 3 ] = " 10000 "
control2 = str ( " : " . join ( ctl ) )
2012-06-19 12:43:08 +02:00
self . ldb_admin . add_remove_group_members ( " Administrators " , [ self . dirsync_user ] ,
2018-07-30 18:16:12 +12:00
add_members_operation = False )
2011-02-27 12:24:45 +03:00
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ control2 ] )
2018-07-30 18:17:02 +12:00
self . assertEqual ( res [ 0 ] . get ( " member;range=1-1 " ) , None )
2011-02-27 12:24:45 +03:00
self . assertEqual ( len ( res [ 0 ] . get ( " member;range=0-0 " ) ) , 1 )
res = self . ldb_admin . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ control1 ] )
2018-07-30 18:17:02 +12:00
self . assertEqual ( res [ 0 ] . get ( " member;range=1-1 " ) , None )
2011-02-27 12:24:45 +03:00
self . assertEqual ( len ( res [ 0 ] . get ( " member;range=0-0 " ) ) , 2 )
2019-10-22 12:12:32 +02:00
def test_dirsync_extended_dn ( self ) :
""" Check that dirsync works together with the extended_dn control """
# Let's search for members
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
res = self . ldb_simple . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:1:1 " ] )
self . assertTrue ( len ( res [ 0 ] . get ( " member " ) ) > 0 )
size = len ( res [ 0 ] . get ( " member " ) )
resEX1 = self . ldb_simple . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:1:1 " , " extended_dn:1:1 " ] )
self . assertTrue ( len ( resEX1 [ 0 ] . get ( " member " ) ) > 0 )
sizeEX1 = len ( resEX1 [ 0 ] . get ( " member " ) )
self . assertEqual ( sizeEX1 , size )
self . assertIn ( res [ 0 ] [ " member " ] [ 0 ] , resEX1 [ 0 ] [ " member " ] [ 0 ] )
self . assertIn ( b " <GUID= " , resEX1 [ 0 ] [ " member " ] [ 0 ] )
self . assertIn ( b " >;<SID=S-1-5-21- " , resEX1 [ 0 ] [ " member " ] [ 0 ] )
resEX0 = self . ldb_simple . search ( self . base_dn ,
expression = " (name=Administrators) " ,
controls = [ " dirsync:1:1:1 " , " extended_dn:1:0 " ] )
self . assertTrue ( len ( resEX0 [ 0 ] . get ( " member " ) ) > 0 )
sizeEX0 = len ( resEX0 [ 0 ] . get ( " member " ) )
self . assertEqual ( sizeEX0 , size )
self . assertIn ( res [ 0 ] [ " member " ] [ 0 ] , resEX0 [ 0 ] [ " member " ] [ 0 ] )
self . assertIn ( b " <GUID= " , resEX0 [ 0 ] [ " member " ] [ 0 ] )
self . assertIn ( b " >;<SID=010500000000000515 " , resEX0 [ 0 ] [ " member " ] [ 0 ] )
2011-02-27 12:24:45 +03:00
def test_dirsync_deleted_items ( self ) :
2023-08-02 10:44:32 +02:00
""" Check that dirsync returned deleted objects too """
2011-02-27 12:24:45 +03:00
# Let's create an OU
self . ldb_simple = self . get_ldb_connection ( self . simple_user , self . user_pass )
2018-07-30 18:18:03 +12:00
ouname = " OU=testou3, %s " % self . base_dn
2011-02-27 12:24:45 +03:00
self . ouname = ouname
self . ldb_admin . create_ou ( ouname )
# Specify LDAP_DIRSYNC_OBJECT_SECURITY
res = self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " (&(objectClass=organizationalUnit)(!(isDeleted=*))) " ,
controls = [ " dirsync:1:1:1 " ] )
2011-02-27 12:24:45 +03:00
guid = None
for e in res :
if str ( e [ " name " ] ) == " testou3 " :
2018-07-30 18:19:05 +12:00
guid = str ( ndr_unpack ( misc . GUID , e . get ( " objectGUID " ) [ 0 ] ) )
2011-02-27 12:24:45 +03:00
2018-07-30 18:22:15 +12:00
self . assertTrue ( guid is not None )
2011-02-27 12:24:45 +03:00
ctl = str ( res . controls [ 0 ] ) . split ( " : " )
ctl [ 1 ] = " 1 "
ctl [ 2 ] = " 1 "
ctl [ 3 ] = " 10000 "
control1 = str ( " : " . join ( ctl ) )
# So now delete the object and check that
# we can see the object but deleted when admin
# we just see the objectGUID when simple user
delete_force ( self . ldb_admin , ouname )
res = self . ldb_simple . search ( self . base_dn ,
2018-07-30 18:16:12 +12:00
expression = " (objectClass=organizationalUnit) " ,
controls = [ control1 ] )
2011-02-27 12:24:45 +03:00
self . assertEqual ( len ( res ) , 1 )
2018-07-30 18:19:05 +12:00
guid2 = str ( ndr_unpack ( misc . GUID , res [ 0 ] . get ( " objectGUID " ) [ 0 ] ) )
2011-02-27 12:24:45 +03:00
self . assertEqual ( guid2 , guid )
self . assertEqual ( str ( res [ 0 ] . dn ) , " " )
2014-11-01 21:17:39 -07:00
if not getattr ( opts , " listtests " , False ) :
2014-11-01 20:51:46 -07:00
lp = sambaopts . get_loadparm ( )
samba . tests . cmdline_credentials = credopts . get_credentials ( lp )
2011-02-27 12:24:45 +03:00
2014-11-01 20:51:46 -07:00
2014-11-01 21:17:39 -07:00
TestProgram ( module = __name__ , opts = subunitopts )