mirror of
https://github.com/samba-team/samba.git
synced 2025-01-11 05:18:09 +03:00
s4:dsdb/tests: improve the RestoreUserObjectTestCase test
We verify attributes, values and their replication metadata after each step (add, delete, reanimate). Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
parent
cf19ab651a
commit
55932d7ecd
@ -24,6 +24,12 @@ import unittest
|
||||
sys.path.insert(0, "bin/python")
|
||||
import samba
|
||||
|
||||
from samba.ndr import ndr_unpack, ndr_print
|
||||
from samba.dcerpc import misc
|
||||
from samba.dcerpc import security
|
||||
from samba.dcerpc import drsblobs
|
||||
from samba.dcerpc.drsuapi import *
|
||||
|
||||
import samba.tests
|
||||
from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
|
||||
MessageElement, LdbError,
|
||||
@ -61,9 +67,10 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
|
||||
def GUID_string(self, guid):
|
||||
return self.samdb.schema_format_value("objectGUID", guid)
|
||||
|
||||
def search_guid(self, guid):
|
||||
def search_guid(self, guid, attrs=["*"]):
|
||||
res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
|
||||
scope=SCOPE_BASE, controls=["show_deleted:1"])
|
||||
scope=SCOPE_BASE, attrs=attrs,
|
||||
controls=["show_deleted:1"])
|
||||
self.assertEquals(len(res), 1)
|
||||
return res[0]
|
||||
|
||||
@ -133,6 +140,39 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
|
||||
"Unexpected value (%s) for '%s', expected (%s)" % (
|
||||
str(actual_val), name, expected_val))
|
||||
|
||||
def _check_metadata(self, metadata, expected):
|
||||
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0]))
|
||||
|
||||
repl_array = []
|
||||
for o in repl.ctr.array:
|
||||
repl_array.append((o.attid, o.version))
|
||||
repl_set = set(repl_array)
|
||||
|
||||
expected_set = set(expected)
|
||||
self.assertEqual(len(repl_set), len(expected),
|
||||
"Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
|
||||
str(expected_set.difference(repl_set)),
|
||||
str(repl_set.difference(expected_set)),
|
||||
ndr_print(repl)))
|
||||
|
||||
i = 0
|
||||
for o in repl.ctr.array:
|
||||
e = expected[i]
|
||||
(attid, version) = e
|
||||
self.assertEquals(attid, o.attid,
|
||||
"(LDAP) Wrong attid "
|
||||
"for expected value %d, wanted 0x%08x got 0x%08x, "
|
||||
"repl: \n%s"
|
||||
% (i, attid, o.attid, ndr_print(repl)))
|
||||
# Allow version to be skipped when it does not matter
|
||||
if version is not None:
|
||||
self.assertEquals(o.version, version,
|
||||
"(LDAP) Wrong version for expected value %d, "
|
||||
"attid 0x%08x, "
|
||||
"wanted %d got %d, repl: \n%s"
|
||||
% (i, o.attid,
|
||||
version, o.version, ndr_print(repl)))
|
||||
i = i + 1
|
||||
|
||||
@staticmethod
|
||||
def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
|
||||
@ -314,7 +354,7 @@ class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
|
||||
class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
|
||||
"""Test cases for delete/reanimate user objects"""
|
||||
|
||||
def _expected_user_attributes(self, username, user_dn, category):
|
||||
def _expected_user_add_attributes(self, username, user_dn, category):
|
||||
return {'dn': user_dn,
|
||||
'objectClass': '**',
|
||||
'cn': username,
|
||||
@ -335,17 +375,149 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
|
||||
'lastLogoff': '0',
|
||||
'pwdLastSet': '0',
|
||||
'primaryGroupID': '513',
|
||||
'operatorCount': '0',
|
||||
'objectSid': '**',
|
||||
'adminCount': '0',
|
||||
'accountExpires': '9223372036854775807',
|
||||
'logonCount': '0',
|
||||
'sAMAccountName': username,
|
||||
'sAMAccountType': '805306368',
|
||||
'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
|
||||
}
|
||||
|
||||
def _expected_user_add_metadata(self):
|
||||
return [
|
||||
(DRSUAPI_ATTID_objectClass, 1),
|
||||
(DRSUAPI_ATTID_cn, 1),
|
||||
(DRSUAPI_ATTID_instanceType, 1),
|
||||
(DRSUAPI_ATTID_whenCreated, 1),
|
||||
(DRSUAPI_ATTID_ntSecurityDescriptor, 1),
|
||||
(DRSUAPI_ATTID_name, 1),
|
||||
(DRSUAPI_ATTID_userAccountControl, None),
|
||||
(DRSUAPI_ATTID_codePage, 1),
|
||||
(DRSUAPI_ATTID_countryCode, 1),
|
||||
(DRSUAPI_ATTID_dBCSPwd, 1),
|
||||
(DRSUAPI_ATTID_logonHours, 1),
|
||||
(DRSUAPI_ATTID_unicodePwd, 1),
|
||||
(DRSUAPI_ATTID_ntPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_pwdLastSet, 1),
|
||||
(DRSUAPI_ATTID_primaryGroupID, 1),
|
||||
(DRSUAPI_ATTID_objectSid, 1),
|
||||
(DRSUAPI_ATTID_accountExpires, 1),
|
||||
(DRSUAPI_ATTID_lmPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountName, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountType, 1),
|
||||
(DRSUAPI_ATTID_objectCategory, 1)]
|
||||
|
||||
def _expected_user_del_attributes(self, username, _guid, _sid):
|
||||
guid = ndr_unpack(misc.GUID, _guid)
|
||||
dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
|
||||
cn = "%s\nDEL:%s" % (username, guid)
|
||||
return {'dn': dn,
|
||||
'objectClass': '**',
|
||||
'cn': cn,
|
||||
'distinguishedName': dn,
|
||||
'isDeleted': 'TRUE',
|
||||
'isRecycled': 'TRUE',
|
||||
'instanceType': '4',
|
||||
'whenCreated': '**',
|
||||
'whenChanged': '**',
|
||||
'uSNCreated': '**',
|
||||
'uSNChanged': '**',
|
||||
'name': cn,
|
||||
'objectGUID': _guid,
|
||||
'userAccountControl': '546',
|
||||
'objectSid': _sid,
|
||||
'sAMAccountName': username,
|
||||
'lastKnownParent': 'CN=Users,%s' % self.base_dn,
|
||||
}
|
||||
|
||||
def _expected_user_del_metadata(self):
|
||||
return [
|
||||
(DRSUAPI_ATTID_objectClass, 1),
|
||||
(DRSUAPI_ATTID_cn, 2),
|
||||
(DRSUAPI_ATTID_instanceType, 1),
|
||||
(DRSUAPI_ATTID_whenCreated, 1),
|
||||
(DRSUAPI_ATTID_isDeleted, 1),
|
||||
(DRSUAPI_ATTID_ntSecurityDescriptor, 1),
|
||||
(DRSUAPI_ATTID_name, 2),
|
||||
(DRSUAPI_ATTID_userAccountControl, None),
|
||||
(DRSUAPI_ATTID_codePage, 2),
|
||||
(DRSUAPI_ATTID_countryCode, 2),
|
||||
(DRSUAPI_ATTID_dBCSPwd, 1),
|
||||
(DRSUAPI_ATTID_logonHours, 1),
|
||||
(DRSUAPI_ATTID_unicodePwd, 1),
|
||||
(DRSUAPI_ATTID_ntPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_pwdLastSet, 2),
|
||||
(DRSUAPI_ATTID_primaryGroupID, 2),
|
||||
(DRSUAPI_ATTID_objectSid, 1),
|
||||
(DRSUAPI_ATTID_accountExpires, 2),
|
||||
(DRSUAPI_ATTID_lmPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountName, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountType, 2),
|
||||
(DRSUAPI_ATTID_lastKnownParent, 1),
|
||||
(DRSUAPI_ATTID_objectCategory, 2),
|
||||
(DRSUAPI_ATTID_isRecycled, 1)]
|
||||
|
||||
def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
|
||||
return {'dn': user_dn,
|
||||
'objectClass': '**',
|
||||
'cn': username,
|
||||
'distinguishedName': user_dn,
|
||||
'instanceType': '4',
|
||||
'whenCreated': '**',
|
||||
'whenChanged': '**',
|
||||
'uSNCreated': '**',
|
||||
'uSNChanged': '**',
|
||||
'name': username,
|
||||
'objectGUID': guid,
|
||||
'userAccountControl': '546',
|
||||
'badPwdCount': '0',
|
||||
'badPasswordTime': '0',
|
||||
'codePage': '0',
|
||||
'countryCode': '0',
|
||||
'lastLogon': '0',
|
||||
'lastLogoff': '0',
|
||||
'pwdLastSet': '0',
|
||||
'primaryGroupID': '513',
|
||||
'operatorCount': '0',
|
||||
'objectSid': sid,
|
||||
'adminCount': '0',
|
||||
'accountExpires': '0',
|
||||
'logonCount': '0',
|
||||
'sAMAccountName': username,
|
||||
'sAMAccountType': '805306368',
|
||||
'lastKnownParent': 'CN=Users,%s' % self.base_dn,
|
||||
'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
|
||||
}
|
||||
|
||||
def _expected_user_restore_metadata(self):
|
||||
return [
|
||||
(DRSUAPI_ATTID_objectClass, 1),
|
||||
(DRSUAPI_ATTID_cn, 3),
|
||||
(DRSUAPI_ATTID_instanceType, 1),
|
||||
(DRSUAPI_ATTID_whenCreated, 1),
|
||||
(DRSUAPI_ATTID_isDeleted, 2),
|
||||
(DRSUAPI_ATTID_ntSecurityDescriptor, 1),
|
||||
(DRSUAPI_ATTID_name, 3),
|
||||
(DRSUAPI_ATTID_userAccountControl, None),
|
||||
(DRSUAPI_ATTID_codePage, 3),
|
||||
(DRSUAPI_ATTID_countryCode, 3),
|
||||
(DRSUAPI_ATTID_dBCSPwd, 1),
|
||||
(DRSUAPI_ATTID_logonHours, 1),
|
||||
(DRSUAPI_ATTID_unicodePwd, 1),
|
||||
(DRSUAPI_ATTID_ntPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_pwdLastSet, 3),
|
||||
(DRSUAPI_ATTID_primaryGroupID, 3),
|
||||
(DRSUAPI_ATTID_operatorCount, 1),
|
||||
(DRSUAPI_ATTID_objectSid, 1),
|
||||
(DRSUAPI_ATTID_adminCount, 1),
|
||||
(DRSUAPI_ATTID_accountExpires, 3),
|
||||
(DRSUAPI_ATTID_lmPwdHistory, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountName, 1),
|
||||
(DRSUAPI_ATTID_sAMAccountType, 3),
|
||||
(DRSUAPI_ATTID_lastKnownParent, 1),
|
||||
(DRSUAPI_ATTID_objectCategory, 3),
|
||||
(DRSUAPI_ATTID_isRecycled, 2)]
|
||||
|
||||
def test_restore_user(self):
|
||||
print "Test restored user attributes"
|
||||
username = "restore_user"
|
||||
@ -357,18 +529,31 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
|
||||
"sAMAccountName": username})
|
||||
obj = self.search_dn(usr_dn)
|
||||
guid = obj["objectGUID"][0]
|
||||
sid = obj["objectSID"][0]
|
||||
obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
|
||||
self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
|
||||
self._check_metadata(obj_rmd["replPropertyMetaData"],
|
||||
self._expected_user_add_metadata())
|
||||
self.samdb.delete(usr_dn)
|
||||
obj_del = self.search_guid(guid)
|
||||
obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
|
||||
orig_attrs = set(obj.keys())
|
||||
del_attrs = set(obj_del.keys())
|
||||
self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
|
||||
self._check_metadata(obj_del_rmd["replPropertyMetaData"],
|
||||
self._expected_user_del_metadata())
|
||||
# restore the user and fetch what's restored
|
||||
self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
|
||||
obj_restore = self.search_guid(guid)
|
||||
obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
|
||||
# check original attributes and restored one are same
|
||||
orig_attrs = set(obj.keys())
|
||||
# windows restore more attributes that originally we have
|
||||
orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
|
||||
rest_attrs = set(obj_restore.keys())
|
||||
self.assertAttributesEqual(obj, orig_attrs, obj_restore, rest_attrs)
|
||||
self.assertAttributesExists(self._expected_user_attributes(username, usr_dn, "Person"), obj_restore)
|
||||
self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
|
||||
self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
|
||||
self._expected_user_restore_metadata())
|
||||
|
||||
|
||||
class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user