1
0
mirror of https://github.com/samba-team/samba.git synced 2025-07-15 16:59:09 +03:00

s4 upgradeprovision: Move functions to helpers and improve code

Among code improvement the most significant part is that we now
compare DN object instead of their string representation. It allow
 to better react to case an white space difference.
Some new move objects have been added (ie. System into well known
security principals).

This will allow more unittesting

Signed-off-by: Jelmer Vernooij <jelmer@samba.org>
This commit is contained in:
Matthieu Patou
2010-06-08 00:01:16 +04:00
committed by Jelmer Vernooij
parent 8ff65b0136
commit fbeacc1013
5 changed files with 494 additions and 297 deletions

View File

@ -18,90 +18,39 @@
#
import os
from samba.credentials import Credentials
from samba.auth import system_session
from samba.upgradehelpers import get_paths, usn_in_range, get_ldbs,\
find_provision_key_parameters, dn_sort,\
identic_rename, get_diff_sddls
from samba import param
from samba.upgradehelpers import usn_in_range, dn_sort,\
get_diff_sddls, update_secrets
from samba.tests.provision import create_dummy_secretsdb
from samba.tests import env_loadparm, TestCaseInTempDir
import ldb
from samba import Ldb
from ldb import SCOPE_SUBTREE
import samba.tests
lp = env_loadparm()
def dummymessage(a=None, b=None):
if 0:
print "none"
class UpgradeProvisionTestCase(TestCaseInTempDir):
"""Some simple tests for individual functions in the provisioning code.
"""
def test_get_paths(self):
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
targetdir = os.path.join(os.environ["SELFTEST_PREFIX"], "dc")
privatePath = os.path.join(targetdir, "private")
paths = get_paths(param, None, smbConfPath)
self.assertEquals(paths.private_dir, privatePath)
paths2 = get_paths(param, targetdir)
self.assertEquals(paths2.private_dir, privatePath)
def test_usn_in_range(self):
range = [5, 25, 35, 55]
range = []
range.append(5)
range.append(25)
range.append(35)
range.append(55)
vals = []
vals.append(3)
vals.append(26)
vals.append(56)
vals = [3, 26, 56]
for v in vals:
self.assertFalse(usn_in_range(v, range))
vals = []
vals.append(5)
vals.append(20)
vals.append(25)
vals.append(35)
vals.append(36)
vals = [5, 20, 25, 35, 36]
for v in vals:
self.assertTrue(usn_in_range(v, range))
def test_get_ldbs(self):
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
paths = get_paths(param, None, smbConfPath)
creds = Credentials()
creds.guess(lp)
try:
get_ldbs(paths, creds, system_session(), lp)
except:
self.assertTrue(0)
def test_find_key_param(self):
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
paths = get_paths(param, None, smbConfPath)
creds = Credentials()
creds.guess(lp)
rootdn = "dc=samba,dc=example,dc=com"
ldbs = get_ldbs(paths, creds, system_session(), lp)
find_provision_key_parameters(ldbs.sam, ldbs.secrets, paths,
smbConfPath, lp)
try:
names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, paths,
smbConfPath, lp)
except:
self.assertTrue(0)
self.assertTrue(names.realm == "SAMBA.EXAMPLE.COM")
self.assertTrue(str(names.rootdn).lower() == rootdn.lower())
self.assertTrue(names.ntdsguid != "")
def test_dn_sort(self):
# higher level comes after lower even if lexicographicaly closer
# ie dc=tata,dc=toto (2 levels), comes after dc=toto
@ -111,27 +60,7 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
self.assertEquals(dn_sort("dc=toto,dc=tata",
"cn=foo,dc=toto,dc=tata"), -1)
self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata",
"cn=foo, dc=toto,dc=tata"), -1)
def test_identic_rename(self):
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
paths = get_paths(param, None, smbConfPath)
creds = Credentials()
creds.guess(lp)
rootdn = "DC=samba,DC=example,DC=com"
ldbs = get_ldbs(paths, creds, system_session(), lp)
guestDN = ldb.Dn(ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn)
try:
identic_rename(ldbs.sam, guestDN)
res = ldbs.sam.search(expression="(name=Guest)", base=rootdn,
scope=ldb.SCOPE_SUBTREE, attrs=["dn"])
except:
self.assertTrue(0)
self.assertEquals(len(res), 1)
self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn)
"cn=foo, dc=toto,dc=tata"),-1)
def test_get_diff_sddl(self):
sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\
(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)"
@ -148,9 +77,9 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
self.assertEquals(get_diff_sddls(sddl, sddl1) ,"")
txt = get_diff_sddls(sddl, sddl2)
self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA (in current)\n")
self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA(in current)\n")
txt = get_diff_sddls(sddl, sddl3)
self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA (in current)\n")
self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA(in current)\n")
txt = get_diff_sddls(sddl, sddl4)
txtmsg = "\tPart dacl is different between reference and current here\
is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\
@ -159,3 +88,41 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
self.assertEquals(txt , txtmsg)
txt = get_diff_sddls(sddl, sddl5)
self.assertEquals(txt ,"\tCurrent ACL hasn't a sacl part\n")
class UpdateSecretsTests(samba.tests.TestCaseInTempDir):
def setUp(self):
super(UpdateSecretsTests, self).setUp()
self.referencedb = create_dummy_secretsdb(
os.path.join(self.tempdir, "ref.ldb"))
def _getEmptyDb(self):
return Ldb(os.path.join(self.tempdir, "secrets.ldb"))
def _getCurrentFormatDb(self):
return create_dummy_secretsdb(
os.path.join(self.tempdir, "secrets.ldb"))
def test_trivial(self):
# Test that updating an already up-to-date secretsdb works fine
self.secretsdb = self._getCurrentFormatDb()
self.assertEquals(None,
update_secrets(self.referencedb, self.secretsdb, dummymessage))
def test_update_modules(self):
empty_db = self._getEmptyDb()
update_secrets(self.referencedb, empty_db, dummymessage)
newmodules = empty_db.search(
expression="dn=@MODULES", base="", scope=SCOPE_SUBTREE)
refmodules = self.referencedb.search(
expression="dn=@MODULES", base="", scope=SCOPE_SUBTREE)
self.assertEquals(newmodules, refmodules)
def tearDown(self):
for name in ["ref.ldb", "secrets.ldb"]:
path = os.path.join(self.tempdir, name)
if os.path.exists(path):
os.unlink(path)
super(UpdateSecretsTests, self).tearDown()

View File

@ -0,0 +1,138 @@
#!/usr/bin/python
# Unix SMB/CIFS implementation.
# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
import shutil
from samba.credentials import Credentials
from samba.auth import system_session
from samba.provision import getpolicypath
from samba.upgradehelpers import (get_paths, get_ldbs,
find_provision_key_parameters, identic_rename,
updateOEMInfo, getOEMInfo, update_gpo,
delta_update_basesamdb)
from samba.tests.provision import create_dummy_secretsdb
from samba import param
from samba.tests import env_loadparm, TestCaseInTempDir
import ldb
def dummymessage(a=None, b=None):
if 0:
print "none"
lp = env_loadparm()
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir):
"""Some simple tests for individual functions in the provisioning code.
"""
def test_get_ldbs(self):
paths = get_paths(param, None, smbConfPath)
creds = Credentials()
creds.guess(lp)
get_ldbs(paths, creds, system_session(), lp)
def test_find_key_param(self):
paths = get_paths(param, None, smbConfPath)
creds = Credentials()
creds.guess(lp)
rootdn = "dc=samba,dc=example,dc=com"
ldbs = get_ldbs(paths, creds, system_session(), lp)
names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap,
paths, smbConfPath, lp)
self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM")
self.assertTrue(str(names.rootdn).lower() == rootdn.lower())
self.assertTrue(names.policyid_dc != None)
self.assertTrue(names.ntdsguid != "")
class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir):
def _getEmptyDbName(self):
return os.path.join(self.tempdir, "sam.ldb")
def setUp(self):
super(UpgradeProvisionWithLdbTestCase, self).setUp()
paths = get_paths(param, None, smbConfPath)
self.creds = Credentials()
self.creds.guess(lp)
self.paths = paths
self.ldbs = get_ldbs(paths, self.creds, system_session(), lp)
self.lp = lp
self.names = find_provision_key_parameters(self.ldbs.sam, self.ldbs.secrets,
self.ldbs.idmap, paths, smbConfPath, lp)
self.referencedb = create_dummy_secretsdb(
os.path.join(self.tempdir, "ref.ldb"))
def test_identic_rename(self):
rootdn = "DC=samba,DC=example,DC=com"
guestDN = ldb.Dn(self.ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn)
identic_rename(self.ldbs.sam, guestDN)
res = self.ldbs.sam.search(expression="(name=Guest)", base=rootdn,
scope=ldb.SCOPE_SUBTREE, attrs=["dn"])
self.assertEquals(len(res), 1)
self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn)
def test_delta_update_basesamdb(self):
dummysampath = self._getEmptyDbName()
delta_update_basesamdb(self.paths.samdb, dummysampath,
self.creds, system_session(), self.lp, dummymessage)
def test_update_gpo_simple(self):
dir = getpolicypath(self.paths.sysvol, self.names.dnsdomain, self.names.policyid)
shutil.rmtree(dir)
self.assertFalse(os.path.isdir(dir))
update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage)
self.assertTrue(os.path.isdir(dir))
def test_update_gpo_acl(self):
path = os.path.join(self.tempdir, "testupdategpo")
save = self.paths.sysvol
self.paths.sysvol = path
os.mkdir(path)
os.mkdir(os.path.join(path, self.names.dnsdomain))
os.mkdir(os.path.join(os.path.join(path, self.names.dnsdomain), "Policies"))
update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage)
shutil.rmtree(path)
self.paths.sysvol = save
def test_getOEMInfo(self):
realm = self.lp.get("realm")
basedn = "DC=%s" % realm.replace(".", ", DC=")
oem = getOEMInfo(self.ldbs.sam, basedn)
self.assertTrue(oem != "")
def test_updateOEMInfo(self):
realm = self.lp.get("realm")
basedn = "DC=%s" % realm.replace(".", ", DC=")
oem = getOEMInfo(self.ldbs.sam, basedn)
updateOEMInfo(self.ldbs.sam, basedn)
oem2 = getOEMInfo(self.ldbs.sam, basedn)
self.assertTrue(str(oem) != str(oem2))
self.assertTrue(re.match(".*upgrade to.*", str(oem2)))
def tearDown(self):
for name in ["ref.ldb", "secrets.ldb", "sam.ldb"]:
path = os.path.join(self.tempdir, name)
if os.path.exists(path):
os.unlink(path)
super(UpgradeProvisionWithLdbTestCase, self).tearDown()

View File

@ -26,21 +26,44 @@ import os
import string
import re
import shutil
import samba
from samba import Ldb, version, ntacls
from samba.dsdb import DS_DOMAIN_FUNCTION_2000
from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE
import ldb
from samba import Ldb
from samba.dcerpc import misc, security
from samba.dsdb import DS_DOMAIN_FUNCTION_2000
from samba.provision import (ProvisionNames, provision_paths_from_lp,
FILL_FULL, provision, ProvisioningError)
from samba.provision import ProvisionNames, provision_paths_from_lp,\
getpolicypath, set_gpo_acl, create_gpo_struct,\
FILL_FULL, provision, ProvisioningError,\
setsysvolacl
from samba.dcerpc import misc, security, xattr
from samba.ndr import ndr_unpack
# All the ldb related to registry are commented because the path for them is relative
# in the provisionPath object
# And so opening them create a file in the current directory which is not what we want
# I still keep them commented because I plan soon to make more cleaner
ERROR = -1
SIMPLE = 0x00
CHANGE = 0x01
CHANGESD = 0x02
GUESS = 0x04
PROVISION = 0x08
CHANGEALL = 0xff
hashAttrNotCopied = { "dn": 1, "whenCreated": 1, "whenChanged": 1,
"objectGUID": 1, "uSNCreated": 1,
"replPropertyMetaData": 1, "uSNChanged": 1,
"parentGUID": 1, "objectCategory": 1,
"distinguishedName": 1, "nTMixedDomain": 1,
"showInAdvancedViewOnly": 1, "instanceType": 1,
"msDS-Behavior-Version":1, "nextRid":1, "cn": 1,
"versionNumber":1, "lmPwdHistory":1, "pwdLastSet": 1,
"ntPwdHistory":1, "unicodePwd":1,"dBCSPwd":1,
"supplementalCredentials":1, "gPCUserExtensionNames":1,
"gPCMachineExtensionNames":1,"maxPwdAge":1, "secret":1,
"possibleInferiors":1, "privilege":1,
"sAMAccountType":1 }
class ProvisionLDB(object):
def __init__(self):
@ -165,11 +188,12 @@ def get_paths(param, targetdir=None, smbconf=None):
return paths
def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp):
def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp):
"""Get key provision parameters (realm, domain, ...) from a given provision
:param samdb: An LDB object connected to the sam.ldb file
:param secretsdb: An LDB object connected to the secrets.ldb file
:param idmapdb: An LDB object connected to the idmap.ldb file
:param paths: A list of path to provision object
:param smbconf: Path to the smb.conf file
:param lp: A LoadParm object
@ -181,8 +205,8 @@ def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp):
# NT domain, kerberos realm, root dn, domain dn, domain dns name
names.domain = string.upper(lp.get("workgroup"))
names.realm = lp.get("realm")
basedn = "DC=" + names.realm.replace(".", ",DC=")
names.dnsdomain = names.realm
basedn = "DC=" + names.realm.replace(".",",DC=")
names.dnsdomain = names.realm.lower()
names.realm = string.upper(names.realm)
# netbiosname
# Get the netbiosname first (could be obtained from smb.conf in theory)
@ -252,7 +276,12 @@ def find_provision_key_parameters(samdb, secretsdb, paths, smbconf, lp):
names.policyid_dc = str(res8[0]["cn"]).replace("{","").replace("}","")
else:
names.policyid_dc = None
res9 = idmapdb.search(expression="(cn=%s)" % (security.SID_BUILTIN_ADMINISTRATORS),
attrs=["xidNumber"])
if len(res9) == 1:
names.wheel_gid = res9[0]["xidNumber"]
else:
raise ProvisioningError("Unable to find uid/gid for Domain Admins rid")
return names
@ -433,3 +462,214 @@ def get_diff_sddls(refsddl, cursddl):
txt = "%s\tCurrent ACL hasn't a %s part\n" % (txt, part)
return txt
def update_secrets(newsecrets_ldb, secrets_ldb, messagefunc):
"""Update secrets.ldb
:param newsecrets_ldb: An LDB object that is connected to the secrets.ldb
of the reference provision
:param secrets_ldb: An LDB object that is connected to the secrets.ldb
of the updated provision
"""
messagefunc(SIMPLE, "update secrets.ldb")
reference = newsecrets_ldb.search(expression="dn=@MODULES", base="",
scope=SCOPE_SUBTREE)
current = secrets_ldb.search(expression="dn=@MODULES", base="",
scope=SCOPE_SUBTREE)
assert reference, "Reference modules list can not be empty"
if len(current) == 0:
# No modules present
delta = secrets_ldb.msg_diff(ldb.Message(), reference[0])
delta.dn = reference[0].dn
secrets_ldb.add(reference[0])
else:
delta = secrets_ldb.msg_diff(current[0], reference[0])
delta.dn = current[0].dn
secrets_ldb.modify(delta)
reference = newsecrets_ldb.search(expression="objectClass=top", base="",
scope=SCOPE_SUBTREE, attrs=["dn"])
current = secrets_ldb.search(expression="objectClass=top", base="",
scope=SCOPE_SUBTREE, attrs=["dn"])
hash_new = {}
hash = {}
listMissing = []
listPresent = []
empty = ldb.Message()
for i in range(0, len(reference)):
hash_new[str(reference[i]["dn"]).lower()] = reference[i]["dn"]
# Create a hash for speeding the search of existing object in the
# current provision
for i in range(0, len(current)):
hash[str(current[i]["dn"]).lower()] = current[i]["dn"]
for k in hash_new.keys():
if not hash.has_key(k):
listMissing.append(hash_new[k])
else:
listPresent.append(hash_new[k])
for entry in listMissing:
reference = newsecrets_ldb.search(expression="dn=%s" % entry,
base="", scope=SCOPE_SUBTREE)
current = secrets_ldb.search(expression="dn=%s" % entry,
base="", scope=SCOPE_SUBTREE)
delta = secrets_ldb.msg_diff(empty, reference[0])
for att in hashAttrNotCopied.keys():
delta.remove(att)
messagefunc(CHANGE, "Entry %s is missing from secrets.ldb" % reference[0].dn)
for att in delta:
messagefunc(CHANGE, " Adding attribute %s" % att)
delta.dn = reference[0].dn
secrets_ldb.add(delta)
for entry in listPresent:
reference = newsecrets_ldb.search(expression="dn=%s" % entry,
base="", scope=SCOPE_SUBTREE)
current = secrets_ldb.search(expression="dn=%s" % entry, base="",
scope=SCOPE_SUBTREE)
delta = secrets_ldb.msg_diff(current[0], reference[0])
for att in hashAttrNotCopied.keys():
delta.remove(att)
for att in delta:
if att == "name":
messagefunc(CHANGE, "Found attribute name on %s," \
" must rename the DN" % (current[0].dn))
identic_rename(secrets_ldb, reference[0].dn)
else:
delta.remove(att)
for entry in listPresent:
reference = newsecrets_ldb.search(expression="dn=%s" % entry, base="",
scope=SCOPE_SUBTREE)
current = secrets_ldb.search(expression="dn=%s" % entry, base="",
scope=SCOPE_SUBTREE)
delta = secrets_ldb.msg_diff(current[0], reference[0])
for att in hashAttrNotCopied.keys():
delta.remove(att)
for att in delta:
if att != "dn":
messagefunc(CHANGE,
"Adding/Changing attribute %s to %s" % (att, current[0].dn))
delta.dn = current[0].dn
secrets_ldb.modify(delta)
def getOEMInfo(samdb, rootdn):
"""Return OEM Information on the top level
Samba4 use to store version info in this field
:param samdb: An LDB object connect to sam.ldb
:param rootdn: Root DN of the domain
:return: The content of the field oEMInformation (if any)"""
res = samdb.search(expression="(objectClass=*)", base=str(rootdn),
scope=SCOPE_BASE, attrs=["dn", "oEMInformation"])
if len(res) > 0:
info = res[0]["oEMInformation"]
return info
else:
return ""
def updateOEMInfo(samdb, rootdn):
"""Update the OEMinfo field to add information about upgrade
:param samdb: an LDB object connected to the sam DB
:param rootdn: The string representation of the root DN of
the provision (ie. DC=...,DC=...)
"""
res = samdb.search(expression="(objectClass=*)", base=rootdn,
scope=SCOPE_BASE, attrs=["dn", "oEMInformation"])
if len(res) > 0:
info = res[0]["oEMInformation"]
info = "%s, upgrade to %s" % (info, version)
delta = ldb.Message()
delta.dn = ldb.Dn(samdb, str(res[0]["dn"]))
delta["oEMInformation"] = ldb.MessageElement(info, ldb.FLAG_MOD_REPLACE,
"oEMInformation" )
samdb.modify(delta)
def update_gpo(paths, samdb, names, lp, message, force=0):
"""Create missing GPO file object if needed
Set ACL correctly also.
Check ACLs for sysvol/netlogon dirs also
"""
resetacls = 0
try:
ntacls.checkset_backend(lp, None, None)
eadbname = lp.get("posix:eadb")
if eadbname is not None and eadbname != "":
try:
attribute = samba.xattr_tdb.wrap_getxattr(eadbname, paths.sysvol,
xattr.XATTR_NTACL_NAME)
except:
attribute = samba.xattr_native.wrap_getxattr(paths.sysvol,
xattr.XATTR_NTACL_NAME)
else:
attribute = samba.xattr_native.wrap_getxattr(paths.sysvol,
xattr.XATTR_NTACL_NAME)
except:
resetacls = 1
if force:
resetacls = 1
dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid)
if not os.path.isdir(dir):
create_gpo_struct(dir)
dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid_dc)
if not os.path.isdir(dir):
create_gpo_struct(dir)
# We always reinforce acls on GPO folder because they have to be in sync
# with the one in DS
set_gpo_acl(paths.sysvol, names.dnsdomain, names.domainsid,
names.domaindn, samdb, lp)
if resetacls:
setsysvolacl(samdb, paths.netlogon, paths.sysvol, names.wheel_gid,
names.domainsid, names.dnsdomain, names.domaindn, lp)
def delta_update_basesamdb(refsam, sam, creds, session, lp, message):
"""Update the provision container db: sam.ldb
This function is aimed for alpha9 and newer;
:param refsam: Path to the samdb in the reference provision
:param sam: Path to the samdb in the upgraded provision
:param creds: Credential used for openning LDB files
:param session: Session to use for openning LDB files
:param lp: A loadparam object"""
message(SIMPLE,
"Update base samdb by searching difference with reference one")
refsam = Ldb(refsam, session_info=session, credentials=creds,
lp=lp, options=["modules:"])
sam = Ldb(sam, session_info=session, credentials=creds, lp=lp,
options=["modules:"])
empty = ldb.Message()
reference = refsam.search(expression="")
for refentry in reference:
entry = sam.search(expression="dn=%s" % refentry["dn"],
scope=SCOPE_SUBTREE)
if not len(entry):
delta = sam.msg_diff(empty, refentry)
message(CHANGE, "Adding %s to sam db" % str(refentry.dn))
if str(refentry.dn) == "@PROVISION" and\
delta.get(samba.provision.LAST_PROVISION_USN_ATTRIBUTE):
delta.remove(samba.provision.LAST_PROVISION_USN_ATTRIBUTE)
delta.dn = refentry.dn
sam.add(delta)
else:
delta = sam.msg_diff(entry[0], refentry)
if str(refentry.dn) == "@PROVISION" and\
delta.get(samba.provision.LAST_PROVISION_USN_ATTRIBUTE):
delta.remove(samba.provision.LAST_PROVISION_USN_ATTRIBUTE)
if len(delta.items()) > 1:
delta.dn = refentry.dn
sam.modify(delta)