2018-04-04 02:59:41 +03:00
# Tests for SamDb password change audit logging.
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import print_function
""" Tests for the SamDb logging of password changes.
"""
import samba . tests
from samba . dcerpc . messaging import MSG_DSDB_LOG , DSDB_EVENT_NAME
2018-06-06 16:30:44 +03:00
from ldb import ERR_NO_SUCH_OBJECT
2018-04-04 02:59:41 +03:00
from samba . samdb import SamDB
from samba . auth import system_session
import os
import time
from samba . tests . audit_log_base import AuditLogTestBase
from samba . tests import delete_force
from samba . net import Net
import samba
from samba . dcerpc import security , lsa
USER_NAME = " auditlogtestuser "
USER_PASS = samba . generate_random_password ( 32 , 32 )
class AuditLogDsdbTests ( AuditLogTestBase ) :
def setUp ( self ) :
self . message_type = MSG_DSDB_LOG
self . event_type = DSDB_EVENT_NAME
super ( AuditLogDsdbTests , self ) . setUp ( )
self . remoteAddress = os . environ [ " CLIENT_IP " ]
self . server_ip = os . environ [ " SERVER_IP " ]
host = " ldap:// %s " % os . environ [ " SERVER " ]
self . ldb = SamDB ( url = host ,
session_info = system_session ( ) ,
credentials = self . get_credentials ( ) ,
lp = self . get_loadparm ( ) )
self . server = os . environ [ " SERVER " ]
# Gets back the basedn
self . base_dn = self . ldb . domain_dn ( )
# Get the old "dSHeuristics" if it was set
dsheuristics = self . ldb . get_dsheuristics ( )
# Set the "dSHeuristics" to activate the correct "userPassword"
# behaviour
self . ldb . set_dsheuristics ( " 000000001 " )
# Reset the "dSHeuristics" as they were before
self . addCleanup ( self . ldb . set_dsheuristics , dsheuristics )
# Get the old "minPwdAge"
minPwdAge = self . ldb . get_minPwdAge ( )
# Set it temporarily to "0"
self . ldb . set_minPwdAge ( " 0 " )
self . base_dn = self . ldb . domain_dn ( )
# Reset the "minPwdAge" as it was before
self . addCleanup ( self . ldb . set_minPwdAge , minPwdAge )
# (Re)adds the test user USER_NAME with password USER_PASS
delete_force ( self . ldb , " cn= " + USER_NAME + " ,cn=users, " + self . base_dn )
self . ldb . add ( {
" dn " : " cn= " + USER_NAME + " ,cn=users, " + self . base_dn ,
" objectclass " : " user " ,
" sAMAccountName " : USER_NAME ,
" userPassword " : USER_PASS
} )
#
# Discard the messages from the setup code
#
def discardSetupMessages ( self , dn ) :
2018-06-06 16:30:44 +03:00
self . waitForMessages ( 2 , dn = dn )
2018-04-04 02:59:41 +03:00
self . discardMessages ( )
def tearDown ( self ) :
self . discardMessages ( )
super ( AuditLogDsdbTests , self ) . tearDown ( )
2018-06-06 16:30:44 +03:00
def haveExpectedTxn ( self , expected ) :
if self . context [ " txnMessage " ] is not None :
txn = self . context [ " txnMessage " ] [ " dsdbTransaction " ]
if txn [ " transactionId " ] == expected :
return True
return False
def waitForTransaction ( self , expected , connection = None ) :
2018-04-04 02:59:41 +03:00
""" Wait for a transaction message to arrive
The connection is passed through to keep the connection alive
until all the logging messages have been received .
"""
self . connection = connection
start_time = time . time ( )
2018-06-06 16:30:44 +03:00
while not self . haveExpectedTxn ( expected ) :
2018-04-04 02:59:41 +03:00
self . msg_ctx . loop_once ( 0.1 )
if time . time ( ) - start_time > 1 :
self . connection = None
return " "
self . connection = None
return self . context [ " txnMessage " ]
def test_net_change_password ( self ) :
dn = " CN= " + USER_NAME + " ,CN=Users, " + self . base_dn
self . discardSetupMessages ( dn )
creds = self . insta_creds ( template = self . get_credentials ( ) )
lp = self . get_loadparm ( )
net = Net ( creds , lp , server = self . server )
password = " newPassword!!42 "
2018-11-28 17:06:54 +03:00
net . change_password ( newpassword = password ,
2018-04-04 02:59:41 +03:00
username = USER_NAME ,
oldpassword = USER_PASS )
messages = self . waitForMessages ( 1 , net , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Modify " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
2018-06-25 23:29:46 +03:00
# We skip the check for self.get_service_description() as this
# is subject to a race between smbd and the s4 rpc_server code
# as to which will set the description as it is DCE/RPC over SMB
2018-04-04 02:59:41 +03:00
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
attributes = audit [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " clearTextPassword " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertTrue ( actions [ 0 ] [ " redacted " ] )
self . assertEquals ( " replace " , actions [ 0 ] [ " action " ] )
def test_net_set_password ( self ) :
dn = " CN= " + USER_NAME + " ,CN=Users, " + self . base_dn
self . discardSetupMessages ( dn )
creds = self . insta_creds ( template = self . get_credentials ( ) )
lp = self . get_loadparm ( )
net = Net ( creds , lp , server = self . server )
password = " newPassword!!42 "
domain = lp . get ( " workgroup " )
2018-11-28 17:06:54 +03:00
net . set_password ( newpassword = password ,
2018-04-04 02:59:41 +03:00
account_name = USER_NAME ,
domain_name = domain )
messages = self . waitForMessages ( 1 , net , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Modify " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertEquals ( dn , audit [ " dn " ] )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
2018-06-25 23:29:46 +03:00
# We skip the check for self.get_service_description() as this
# is subject to a race between smbd and the s4 rpc_server code
# as to which will set the description as it is DCE/RPC over SMB
2018-04-04 02:59:41 +03:00
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
attributes = audit [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " clearTextPassword " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertTrue ( actions [ 0 ] [ " redacted " ] )
self . assertEquals ( " replace " , actions [ 0 ] [ " action " ] )
def test_ldap_change_password ( self ) :
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . discardSetupMessages ( dn )
new_password = samba . generate_random_password ( 32 , 32 )
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: modify \n " +
" delete: userPassword \n " +
" userPassword: " + USER_PASS + " \n " +
" add: userPassword \n " +
" userPassword: " + new_password + " \n " )
messages = self . waitForMessages ( 1 )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Modify " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertEquals ( dn , audit [ " dn " ] )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
attributes = audit [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " userPassword " ] [ " actions " ]
self . assertEquals ( 2 , len ( actions ) )
self . assertTrue ( actions [ 0 ] [ " redacted " ] )
self . assertEquals ( " delete " , actions [ 0 ] [ " action " ] )
self . assertTrue ( actions [ 1 ] [ " redacted " ] )
self . assertEquals ( " add " , actions [ 1 ] [ " action " ] )
def test_ldap_replace_password ( self ) :
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . discardSetupMessages ( dn )
new_password = samba . generate_random_password ( 32 , 32 )
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: modify \n " +
" replace: userPassword \n " +
" userPassword: " + new_password + " \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Modify " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
attributes = audit [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " userPassword " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertTrue ( actions [ 0 ] [ " redacted " ] )
self . assertEquals ( " replace " , actions [ 0 ] [ " action " ] )
def test_ldap_add_user ( self ) :
# The setup code adds a user, so we check for the dsdb events
# generated by it.
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
messages = self . waitForMessages ( 2 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 2 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 1 ] [ " dsdbChange " ]
self . assertEquals ( " Add " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertEquals ( dn , audit [ " dn " ] )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
attributes = audit [ " attributes " ]
self . assertEquals ( 3 , len ( attributes ) )
actions = attributes [ " objectclass " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
self . assertEquals ( 1 , len ( actions [ 0 ] [ " values " ] ) )
self . assertEquals ( " user " , actions [ 0 ] [ " values " ] [ 0 ] [ " value " ] )
actions = attributes [ " sAMAccountName " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
self . assertEquals ( 1 , len ( actions [ 0 ] [ " values " ] ) )
self . assertEquals ( USER_NAME , actions [ 0 ] [ " values " ] [ 0 ] [ " value " ] )
actions = attributes [ " userPassword " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
self . assertTrue ( actions [ 0 ] [ " redacted " ] )
def test_samdb_delete_user ( self ) :
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . discardSetupMessages ( dn )
self . ldb . deleteuser ( USER_NAME )
2018-06-06 16:30:44 +03:00
messages = self . waitForMessages ( 1 , dn = dn )
2018-04-04 02:59:41 +03:00
print ( " Received %d messages " % len ( messages ) )
2018-06-06 16:30:44 +03:00
self . assertEquals ( 1 ,
2018-04-04 02:59:41 +03:00
len ( messages ) ,
" Did not receive the expected number of messages " )
2018-06-06 16:30:44 +03:00
audit = messages [ 0 ] [ " dsdbChange " ]
2018-04-04 02:59:41 +03:00
self . assertEquals ( " Delete " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
2018-06-06 16:30:44 +03:00
self . assertEquals ( 0 , audit [ " statusCode " ] )
self . assertEquals ( " Success " , audit [ " status " ] )
2018-04-04 02:59:41 +03:00
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
2018-06-06 16:30:44 +03:00
transactionId = audit [ " transactionId " ]
message = self . waitForTransaction ( transactionId )
audit = message [ " dsdbTransaction " ]
self . assertEquals ( " commit " , audit [ " action " ] )
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
self . assertTrue ( audit [ " duration " ] > 0 )
2018-04-04 02:59:41 +03:00
2018-06-06 16:30:44 +03:00
def test_samdb_delete_non_existent_dn ( self ) :
2018-04-04 02:59:41 +03:00
2018-06-06 16:30:44 +03:00
DOES_NOT_EXIST = " doesNotExist "
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . discardSetupMessages ( dn )
2018-04-04 02:59:41 +03:00
2018-06-06 16:30:44 +03:00
dn = " cn= " + DOES_NOT_EXIST + " ,cn=users, " + self . base_dn
2018-04-04 02:59:41 +03:00
try :
2018-06-06 16:30:44 +03:00
self . ldb . delete ( dn )
self . fail ( " Exception not thrown " )
2018-04-04 02:59:41 +03:00
except Exception :
pass
2018-06-06 16:30:44 +03:00
messages = self . waitForMessages ( 1 )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Delete " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertEquals ( ERR_NO_SUCH_OBJECT , audit [ " statusCode " ] )
self . assertEquals ( " No such object " , audit [ " status " ] )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
2018-04-04 02:59:41 +03:00
2018-06-06 16:30:44 +03:00
transactionId = audit [ " transactionId " ]
message = self . waitForTransaction ( transactionId )
2018-04-04 02:59:41 +03:00
audit = message [ " dsdbTransaction " ]
self . assertEquals ( " rollback " , audit [ " action " ] )
self . assertTrue ( self . is_guid ( audit [ " transactionId " ] ) )
2018-06-06 16:30:44 +03:00
self . assertTrue ( audit [ " duration " ] > 0 )
2018-04-04 02:59:41 +03:00
def test_create_and_delete_secret_over_lsa ( self ) :
dn = " cn=Test Secret,CN=System, " + self . base_dn
self . discardSetupMessages ( dn )
creds = self . insta_creds ( template = self . get_credentials ( ) )
lsa_conn = lsa . lsarpc (
" ncacn_np: %s " % self . server ,
self . get_loadparm ( ) ,
creds )
lsa_handle = lsa_conn . OpenPolicy2 (
system_name = " \\ " ,
attr = lsa . ObjectAttribute ( ) ,
access_mask = security . SEC_FLAG_MAXIMUM_ALLOWED )
secret_name = lsa . String ( )
secret_name . string = " G$Test "
lsa_conn . CreateSecret (
handle = lsa_handle ,
name = secret_name ,
access_mask = security . SEC_FLAG_MAXIMUM_ALLOWED )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Add " , audit [ " operation " ] )
self . assertTrue ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
2018-06-25 23:29:46 +03:00
# We skip the check for self.get_service_description() as this
# is subject to a race between smbd and the s4 rpc_server code
# as to which will set the description as it is DCE/RPC over SMB
2018-04-04 02:59:41 +03:00
attributes = audit [ " attributes " ]
self . assertEquals ( 2 , len ( attributes ) )
object_class = attributes [ " objectClass " ]
self . assertEquals ( 1 , len ( object_class [ " actions " ] ) )
action = object_class [ " actions " ] [ 0 ]
self . assertEquals ( " add " , action [ " action " ] )
values = action [ " values " ]
self . assertEquals ( 1 , len ( values ) )
self . assertEquals ( " secret " , values [ 0 ] [ " value " ] )
cn = attributes [ " cn " ]
self . assertEquals ( 1 , len ( cn [ " actions " ] ) )
action = cn [ " actions " ] [ 0 ]
self . assertEquals ( " add " , action [ " action " ] )
values = action [ " values " ]
self . assertEquals ( 1 , len ( values ) )
self . assertEquals ( " Test Secret " , values [ 0 ] [ " value " ] )
#
# Now delete the secret.
self . discardMessages ( )
h = lsa_conn . OpenSecret (
handle = lsa_handle ,
name = secret_name ,
access_mask = security . SEC_FLAG_MAXIMUM_ALLOWED )
lsa_conn . DeleteObject ( h )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
dn = " cn=Test Secret,CN=System, " + self . base_dn
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Delete " , audit [ " operation " ] )
self . assertTrue ( audit [ " performedAsSystem " ] )
self . assertTrue ( dn . lower ( ) , audit [ " dn " ] . lower ( ) )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
2018-06-25 23:29:46 +03:00
# We skip the check for self.get_service_description() as this
# is subject to a race between smbd and the s4 rpc_server code
# as to which will set the description as it is DCE/RPC over SMB
2018-04-04 02:59:41 +03:00
def test_modify ( self ) :
dn = " cn= " + USER_NAME + " ,cn=users, " + self . base_dn
self . discardSetupMessages ( dn )
#
# Add an attribute value
#
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: modify \n " +
" add: carLicense \n " +
" carLicense: license-01 \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
audit = messages [ 0 ] [ " dsdbChange " ]
self . assertEquals ( " Modify " , audit [ " operation " ] )
self . assertFalse ( audit [ " performedAsSystem " ] )
self . assertEquals ( dn , audit [ " dn " ] )
self . assertRegexpMatches ( audit [ " remoteAddress " ] ,
self . remoteAddress )
self . assertTrue ( self . is_guid ( audit [ " sessionId " ] ) )
session_id = self . get_session ( )
self . assertEquals ( session_id , audit [ " sessionId " ] )
service_description = self . get_service_description ( )
self . assertEquals ( service_description , " LDAP " )
attributes = audit [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " carLicense " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
values = actions [ 0 ] [ " values " ]
self . assertEquals ( 1 , len ( values ) )
self . assertEquals ( " license-01 " , values [ 0 ] [ " value " ] )
#
# Add an another value to the attribute
#
self . discardMessages ( )
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: modify \n " +
" add: carLicense \n " +
" carLicense: license-02 \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
attributes = messages [ 0 ] [ " dsdbChange " ] [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " carLicense " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
values = actions [ 0 ] [ " values " ]
self . assertEquals ( 1 , len ( values ) )
self . assertEquals ( " license-02 " , values [ 0 ] [ " value " ] )
#
# Add an another two values to the attribute
#
self . discardMessages ( )
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: modify \n " +
" add: carLicense \n " +
" carLicense: license-03 \n " +
" carLicense: license-04 \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
attributes = messages [ 0 ] [ " dsdbChange " ] [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " carLicense " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " add " , actions [ 0 ] [ " action " ] )
values = actions [ 0 ] [ " values " ]
self . assertEquals ( 2 , len ( values ) )
self . assertEquals ( " license-03 " , values [ 0 ] [ " value " ] )
self . assertEquals ( " license-04 " , values [ 1 ] [ " value " ] )
#
# delete two values to the attribute
#
self . discardMessages ( )
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: delete \n " +
" delete: carLicense \n " +
" carLicense: license-03 \n " +
" carLicense: license-04 \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
attributes = messages [ 0 ] [ " dsdbChange " ] [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " carLicense " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " delete " , actions [ 0 ] [ " action " ] )
values = actions [ 0 ] [ " values " ]
self . assertEquals ( 2 , len ( values ) )
self . assertEquals ( " license-03 " , values [ 0 ] [ " value " ] )
self . assertEquals ( " license-04 " , values [ 1 ] [ " value " ] )
#
# replace two values to the attribute
#
self . discardMessages ( )
self . ldb . modify_ldif (
" dn: " + dn + " \n " +
" changetype: delete \n " +
" replace: carLicense \n " +
" carLicense: license-05 \n " +
" carLicense: license-06 \n " )
messages = self . waitForMessages ( 1 , dn = dn )
print ( " Received %d messages " % len ( messages ) )
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
attributes = messages [ 0 ] [ " dsdbChange " ] [ " attributes " ]
self . assertEquals ( 1 , len ( attributes ) )
actions = attributes [ " carLicense " ] [ " actions " ]
self . assertEquals ( 1 , len ( actions ) )
self . assertEquals ( " replace " , actions [ 0 ] [ " action " ] )
values = actions [ 0 ] [ " values " ]
self . assertEquals ( 2 , len ( values ) )
self . assertEquals ( " license-05 " , values [ 0 ] [ " value " ] )
self . assertEquals ( " license-06 " , values [ 1 ] [ " value " ] )