2010-03-24 16:50:50 +11:00
#!/usr/bin/env python
2010-02-04 17:03:41 -02:00
# -*- coding: utf-8 -*-
import optparse
import sys
import os
2011-02-01 14:43:34 +11:00
sys . path . insert ( 0 , " bin/python " )
2010-06-30 10:57:37 +02:00
import samba
samba . ensure_external_module ( " testtools " , " testtools " )
2010-11-03 17:53:19 +01:00
samba . ensure_external_module ( " subunit " , " subunit/python " )
2010-02-04 17:03:41 -02:00
import samba . getopt as options
from samba . auth import system_session
2011-01-06 12:40:07 +01:00
from ldb import ( LdbError , ERR_NO_SUCH_OBJECT , Message ,
2010-04-04 00:30:34 +02:00
MessageElement , Dn , FLAG_MOD_REPLACE )
2010-04-08 22:07:42 +02:00
from samba . samdb import SamDB
2010-06-19 18:58:18 +02:00
import samba . tests
2010-11-04 13:16:11 +11:00
import samba . dsdb as dsdb
2010-02-04 17:03:41 -02:00
from subunit . run import SubunitTestRunner
import unittest
2010-11-10 13:35:30 +01:00
parser = optparse . OptionParser ( " urgent_replication.py [options] <host> " )
2010-02-04 17:03:41 -02:00
sambaopts = options . SambaOptions ( parser )
parser . add_option_group ( sambaopts )
parser . add_option_group ( options . VersionOptions ( parser ) )
# use command line creds if available
credopts = options . CredentialsOptions ( parser )
parser . add_option_group ( credopts )
opts , args = parser . parse_args ( )
if len ( args ) < 1 :
parser . print_usage ( )
sys . exit ( 1 )
host = args [ 0 ]
lp = sambaopts . get_loadparm ( )
creds = credopts . get_credentials ( lp )
2010-06-19 18:58:18 +02:00
class UrgentReplicationTests ( samba . tests . TestCase ) :
2010-02-04 17:03:41 -02:00
def delete_force ( self , ldb , dn ) :
try :
2010-08-01 17:44:50 +02:00
ldb . delete ( dn , [ " relax:0 " ] )
2010-02-04 17:03:41 -02:00
except LdbError , ( num , _ ) :
self . assertEquals ( num , ERR_NO_SUCH_OBJECT )
def setUp ( self ) :
2010-06-19 18:58:18 +02:00
super ( UrgentReplicationTests , self ) . setUp ( )
2010-02-04 17:03:41 -02:00
self . ldb = ldb
2010-11-18 16:06:46 +01:00
self . base_dn = ldb . domain_dn ( )
2010-02-04 17:03:41 -02:00
print " baseDN: %s \n " % self . base_dn
def test_nonurgent_object ( self ) :
2010-04-04 03:08:05 +02:00
""" Test if the urgent replication is not activated
when handling a non urgent object """
2010-02-04 17:03:41 -02:00
self . ldb . add ( {
" dn " : " cn=nonurgenttest,cn=users, " + self . base_dn ,
" objectclass " : " user " ,
" samaccountname " : " nonurgenttest " ,
2010-11-03 19:11:26 +01:00
" description " : " nonurgenttest description " } )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should not be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should not be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=nonurgenttest,cn=users, " + self . base_dn )
m [ " description " ] = MessageElement ( " new description " , FLAG_MOD_REPLACE ,
" description " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should not be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=nonurgenttest,cn=users, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_nTDSDSA_object ( self ) :
''' Test if the urgent replication is activated
when handling a nTDSDSA object '''
self . ldb . add ( {
2011-09-16 15:15:35 +10:00
" dn " : " cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites, %s " % self . ldb . get_config_basedn ( ) ,
2010-02-04 17:03:41 -02:00
" objectclass " : " server " ,
" cn " : " test server " ,
" name " : " test server " ,
2010-11-03 19:11:26 +01:00
" systemFlags " : " 50000000 " } , [ " relax:0 " ] )
2010-02-04 17:03:41 -02:00
self . ldb . add_ldif (
""" dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration, %s """ % ( self . base_dn ) + """
objectclass : nTDSDSA
cn : NTDS Settings test
options : 1
instanceType : 4
2010-11-03 19:11:26 +01:00
systemFlags : 33554432 """ , [ " relax:0 " ])
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creation
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration, " + self . base_dn )
m [ " options " ] = MessageElement ( " 0 " , FLAG_MOD_REPLACE ,
" options " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration, " + self . base_dn )
def test_crossRef_object ( self ) :
''' Test if the urgent replication is activated
when handling a crossRef object '''
self . ldb . add ( {
" dn " : " CN=test crossRef,CN=Partitions,CN=Configuration, " + self . base_dn ,
" objectClass " : " crossRef " ,
" cn " : " test crossRef " ,
2010-06-06 20:23:42 +02:00
" dnsRoot " : lp . get ( " realm " ) . lower ( ) ,
2010-02-04 17:03:41 -02:00
" instanceType " : " 4 " ,
" nCName " : self . base_dn ,
" showInAdvancedViewOnly " : " TRUE " ,
" name " : " test crossRef " ,
2010-11-04 01:36:57 +01:00
" systemFlags " : " 1 " } , [ " relax:0 " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=test crossRef,CN=Partitions,CN=Configuration, " + self . base_dn )
m [ " systemFlags " ] = MessageElement ( " 0 " , FLAG_MOD_REPLACE ,
" systemFlags " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=test crossRef,CN=Partitions,CN=Configuration, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_attributeSchema_object ( self ) :
''' Test if the urgent replication is activated
when handling an attributeSchema object '''
try :
self . ldb . add_ldif (
""" dn: CN=test attributeSchema,cn=Schema,CN=Configuration, %s """ % self . base_dn + """
objectClass : attributeSchema
cn : test attributeSchema
instanceType : 4
isSingleValued : FALSE
showInAdvancedViewOnly : FALSE
attributeID : 0.9 .2342 .19200300 .100 .1 .1
attributeSyntax : 2.5 .5 .12
adminDisplayName : test attributeSchema
adminDescription : test attributeSchema
oMSyntax : 64
systemOnly : FALSE
searchFlags : 8
lDAPDisplayName : test attributeSchema
2010-11-03 19:11:26 +01:00
name : test attributeSchema """ )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Schema,cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
except LdbError :
print " Not testing urgent replication when creating attributeSchema object ... \n "
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " CN=test attributeSchema,CN=Schema,CN=Configuration, " + self . base_dn )
m [ " lDAPDisplayName " ] = MessageElement ( " updated test attributeSchema " , FLAG_MOD_REPLACE ,
" lDAPDisplayName " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Schema,cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_classSchema_object ( self ) :
''' Test if the urgent replication is activated
when handling a classSchema object '''
try :
self . ldb . add_ldif (
""" dn: CN=test classSchema,CN=Schema,CN=Configuration, %s """ % self . base_dn + """
objectClass : classSchema
cn : test classSchema
instanceType : 4
subClassOf : top
governsID : 1.2 .840 .113556 .1 .5 .999
rDNAttID : cn
showInAdvancedViewOnly : TRUE
adminDisplayName : test classSchema
adminDescription : test classSchema
objectClassCategory : 1
lDAPDisplayName : test classSchema
name : test classSchema
systemOnly : FALSE
systemPossSuperiors : dfsConfiguration
systemMustContain : msDFS - SchemaMajorVersion
defaultSecurityDescriptor : D : ( A ; ; RPWPCRCCDCLCLORCWOWDSDDTSW ; ; ; DA ) ( A ; ; RPWPCRCCD
CLCLORCWOWDSDDTSW ; ; ; SY ) ( A ; ; RPLCLORC ; ; ; AU ) ( A ; ; RPWPCRCCDCLCLORCWOWDSDDTSW ; ; ; CO )
systemFlags : 16
2010-11-03 19:11:26 +01:00
defaultHidingValue : TRUE """ )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Schema,cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
except LdbError :
print " Not testing urgent replication when creating classSchema object ... \n "
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " CN=test classSchema,CN=Schema,CN=Configuration, " + self . base_dn )
m [ " lDAPDisplayName " ] = MessageElement ( " updated test classSchema " , FLAG_MOD_REPLACE ,
" lDAPDisplayName " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( " cn=Schema,cn=Configuration, " + self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_secret_object ( self ) :
''' Test if the urgent replication is activated
when handling a secret object '''
self . ldb . add ( {
" dn " : " cn=test secret,cn=System, " + self . base_dn ,
" objectClass " : " secret " ,
" cn " : " test secret " ,
" name " : " test secret " ,
2011-01-14 18:45:32 +01:00
" currentValue " : " xxxxxxx " } )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=test secret,cn=System, " + self . base_dn )
m [ " currentValue " ] = MessageElement ( " yyyyyyyy " , FLAG_MOD_REPLACE ,
" currentValue " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=test secret,cn=System, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_rIDManager_object ( self ) :
''' Test if the urgent replication is activated
when handling a rIDManager object '''
self . ldb . add_ldif (
""" dn: CN=RID Manager test,CN=System, %s """ % self . base_dn + """
objectClass : rIDManager
cn : RID Manager test
instanceType : 4
showInAdvancedViewOnly : TRUE
name : RID Manager test
systemFlags : - 1946157056
isCriticalSystemObject : TRUE
rIDAvailablePool : 133001 - 1073741823 """ , [ " relax:0 " ])
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " CN=RID Manager test,CN=System, " + self . base_dn )
m [ " systemFlags " ] = MessageElement ( " 0 " , FLAG_MOD_REPLACE ,
" systemFlags " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " CN=RID Manager test,CN=System, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
def test_urgent_attributes ( self ) :
''' Test if the urgent replication is activated
when handling urgent attributes of an object '''
self . ldb . add ( {
" dn " : " cn=user UrgAttr test,cn=users, " + self . base_dn ,
" objectclass " : " user " ,
" samaccountname " : " user UrgAttr test " ,
2010-11-04 13:16:11 +11:00
" userAccountControl " : str ( dsdb . UF_NORMAL_ACCOUNT ) ,
2010-02-04 17:03:41 -02:00
" lockoutTime " : " 0 " ,
" pwdLastSet " : " 0 " ,
2010-11-03 19:11:26 +01:00
" description " : " urgent attributes test description " } )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when creating
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying userAccountControl
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=user UrgAttr test,cn=users, " + self . base_dn )
2010-11-04 13:16:11 +11:00
m [ " userAccountControl " ] = MessageElement ( str ( dsdb . UF_NORMAL_ACCOUNT + dsdb . UF_SMARTCARD_REQUIRED ) , FLAG_MOD_REPLACE ,
2010-02-04 17:03:41 -02:00
" userAccountControl " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying lockoutTime
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=user UrgAttr test,cn=users, " + self . base_dn )
m [ " lockoutTime " ] = MessageElement ( " 1 " , FLAG_MOD_REPLACE ,
" lockoutTime " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should be enabled when modifying pwdLastSet
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=user UrgAttr test,cn=users, " + self . base_dn )
m [ " pwdLastSet " ] = MessageElement ( " 1 " , FLAG_MOD_REPLACE ,
" pwdLastSet " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when modifying a not-urgent
# attribute
2010-02-04 17:03:41 -02:00
m = Message ( )
m . dn = Dn ( ldb , " cn=user UrgAttr test,cn=users, " + self . base_dn )
m [ " description " ] = MessageElement ( " updated urgent attributes test description " ,
FLAG_MOD_REPLACE , " description " )
ldb . modify ( m )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
2010-04-04 03:08:05 +02:00
# urgent replication should NOT be enabled when deleting
2010-02-04 17:03:41 -02:00
self . delete_force ( self . ldb , " cn=user UrgAttr test,cn=users, " + self . base_dn )
2010-04-08 22:07:42 +02:00
res = self . ldb . load_partition_usn ( self . base_dn )
2010-11-03 19:11:26 +01:00
self . assertNotEquals ( res [ " uSNHighest " ] , res [ " uSNUrgent " ] )
2010-02-04 17:03:41 -02:00
if not " :// " in host :
if os . path . isfile ( host ) :
host = " tdb:// %s " % host
else :
host = " ldap:// %s " % host
2010-12-15 15:16:54 +02:00
ldb = SamDB ( host , credentials = creds , session_info = system_session ( lp ) , lp = lp ,
2010-04-08 22:07:42 +02:00
global_schema = False )
2010-02-04 17:03:41 -02:00
runner = SubunitTestRunner ( )
rc = 0
if not runner . run ( unittest . makeSuite ( UrgentReplicationTests ) ) . wasSuccessful ( ) :
rc = 1
sys . exit ( rc )