1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-03 04:22:09 +03:00

s4 upgradeprovision: move some functions to upgradehelpers for unit tests

Signed-off-by: Jelmer Vernooij <jelmer@samba.org>
This commit is contained in:
Matthieu Patou
2010-06-08 00:52:25 +04:00
committed by Jelmer Vernooij
parent 0537de17c1
commit ad55248958
2 changed files with 154 additions and 130 deletions

View File

@ -53,7 +53,8 @@ from samba.dcerpc import security, drsblobs
from samba.ndr import ndr_unpack from samba.ndr import ndr_unpack
from samba.dcerpc.misc import SEC_CHAN_BDC from samba.dcerpc.misc import SEC_CHAN_BDC
from samba.upgradehelpers import dn_sort, get_paths, newprovision,\ from samba.upgradehelpers import dn_sort, get_paths, newprovision,\
find_provision_key_parameters, get_ldbs find_provision_key_parameters, get_ldbs,\
usn_in_range, identic_rename, get_diff_sddls
replace=2**FLAG_MOD_REPLACE replace=2**FLAG_MOD_REPLACE
add=2**FLAG_MOD_ADD add=2**FLAG_MOD_ADD
@ -179,42 +180,6 @@ if setup_dir is None:
setup_dir = find_setup_dir() setup_dir = find_setup_dir()
def identic_rename(ldbobj,dn):
"""Perform a back and forth rename to trigger renaming on attribute that can't be directly modified.
:param lbdobj: An Ldb Object
:param dn: DN of the object to manipulate """
(before, sep, after)=str(dn).partition('=')
ldbobj.rename(dn, Dn(ldbobj, "%s=foo%s" % (before, after)))
ldbobj.rename(Dn(ldbobj, "%s=foo%s" % (before, after)), dn)
def usn_in_range(usn, range):
"""Check if the usn is in one of the range provided.
To do so, the value is checked to be between the lower bound and
higher bound of a range
:param usn: A integer value corresponding to the usn that we want to update
:param range: A list of integer representing ranges, lower bounds are in
the even indices, higher in odd indices
:return: 1 if the usn is in one of the range, 0 otherwise"""
idx = 0
cont = 1
ok = 0
while (cont == 1):
if idx == len(range):
cont = 0
continue
if usn < int(range[idx]):
if idx %2 == 1:
ok = 1
cont = 0
if usn == int(range[idx]):
cont = 0
ok = 1
idx = idx + 1
return ok
def check_for_DNS(refprivate, private): def check_for_DNS(refprivate, private):
"""Check if the provision has already the requirement for dynamic dns """Check if the provision has already the requirement for dynamic dns
@ -250,9 +215,6 @@ def check_for_DNS(refprivate, private):
" dns update" % destdir) " dns update" % destdir)
# dnsupdate:path
def populate_links(samdb, schemadn): def populate_links(samdb, schemadn):
"""Populate an array with all the back linked attributes """Populate an array with all the back linked attributes
@ -1051,48 +1013,7 @@ def update_partition(ref_samdb, samdb, basedn, names, schema, provisionUSNs):
traceback.print_exception(typ, val, tb) traceback.print_exception(typ, val, tb)
return 0 return 0
def chunck_acl(acl):
"""Return separate ACE of an ACL
:param acl: A string representing the ACL
:return: A hash with different parts
"""
p = re.compile(r'(\w+)?(\(.*?\))')
tab = p.findall(acl)
hash = {}
hash["aces"] = []
for e in tab:
if len(e[0]) > 0:
hash["flags"] = e[0]
hash["aces"].append(e[1])
return hash
def chunck_sddl(sddl):
""" Return separate parts of the SDDL (owner, group, ...)
:param sddl: An string containing the SDDL to chunk
:return: A hash with the different chunk
"""
p = re.compile(r'([OGDS]:)(.*?)(?=(?:[GDS]:|$))')
tab = p.findall(sddl)
hash = {}
for e in tab:
if e[0] == "O:":
hash["owner"] = e[1]
if e[0] == "G:":
hash["group"] = e[1]
if e[0] == "D:":
hash["dacl"] = e[1]
if e[0] == "S:":
hash["sacl"] = e[1]
return hash
def check_updated_sd(ref_sam, cur_sam, names): def check_updated_sd(ref_sam, cur_sam, names):
"""Check if the security descriptor in the upgraded provision are the same """Check if the security descriptor in the upgraded provision are the same
@ -1112,59 +1033,22 @@ def check_updated_sd(ref_sam, cur_sam, names):
attrs=["dn", "nTSecurityDescriptor"], attrs=["dn", "nTSecurityDescriptor"],
controls=["search_options:1:2"]) controls=["search_options:1:2"])
hash = {} hash = {}
for i in range(0,len(reference)): for i in range(0, len(reference)):
hash[str(reference[i]["dn"]).lower()] = ndr_unpack(security.descriptor,str(reference[i]["nTSecurityDescriptor"])).as_sddl(names.domainsid) refsd = ndr_unpack(security.descriptor,
str(reference[i]["nTSecurityDescriptor"]))
hash[str(reference[i]["dn"]).lower()] = refsd.as_sddl(names.domainsid)
for i in range(0,len(current)): for i in range(0, len(current)):
key = str(current[i]["dn"]).lower() key = str(current[i]["dn"]).lower()
if hash.has_key(key): if hash.has_key(key):
sddl = ndr_unpack(security.descriptor,str(current[i]["nTSecurityDescriptor"])).as_sddl(names.domainsid) cursd = ndr_unpack(security.descriptor,
str(current[i]["nTSecurityDescriptor"]))
sddl = cursd.as_sddl(names.domainsid)
if sddl != hash[key]: if sddl != hash[key]:
txt = "" txt = get_diff_sddls(hash[key], sddl)
hash_new = chunck_sddl(sddl)
hash_ref = chunck_sddl(hash[key])
if hash_new["owner"] != hash_ref["owner"]:
txt = "\tOwner mismatch: %s (in ref) %s (in current provision)\n" % (hash_ref["owner"], hash_new["owner"])
if hash_new["group"] != hash_ref["group"]:
txt = "%s\tGroup mismatch: %s (in ref) %s (in current provision)\n" % (txt, hash_ref["group"], hash_new["group"])
for part in ["dacl","sacl"]:
if hash_new.has_key(part) and hash_ref.has_key(part):
# both are present, check if they contain the same ACE
h_new = {}
h_ref = {}
c_new = chunck_acl(hash_new[part])
c_ref = chunck_acl(hash_ref[part])
for elem in c_new["aces"]:
h_new[elem] = 1
for elem in c_ref["aces"]:
h_ref[elem] = 1
for k in h_ref.keys():
if h_new.has_key(k):
h_new.pop(k)
h_ref.pop(k)
if len(h_new.keys()) + len(h_ref.keys()) > 0:
txt = "%s\tPart %s is different between reference and current provision here is the detail:\n" % (txt, part)
for item in h_new.keys():
txt = "%s\t\t%s ACE is not present in the reference provision\n" % (txt, item)
for item in h_ref.keys():
txt = "%s\t\t%s ACE is not present in the current provision\n" % (txt, item)
elif hash_new.has_key(part) and not hash_ref.has_key(part):
txt = "%s\tReference provision ACL hasn't a %s part\n" % (txt, part)
elif not hash_new.has_key(part) and hash_ref.has_key(part):
txt = "%s\tCurrent provision ACL hasn't a %s part\n" % (txt, part)
if txt != "": if txt != "":
print "On object %s ACL is different\n%s" % (current[i]["dn"], txt) message(CHANGESD, "On object %s ACL is different"\
" \n%s" % (current[i]["dn"], txt))

View File

@ -114,6 +114,33 @@ def get_ldbs(paths, creds, session, lp):
return ldbs return ldbs
def usn_in_range(usn, range):
"""Check if the usn is in one of the range provided.
To do so, the value is checked to be between the lower bound and
higher bound of a range
:param usn: A integer value corresponding to the usn that we want to update
:param range: A list of integer representing ranges, lower bounds are in
the even indices, higher in odd indices
:return: 1 if the usn is in one of the range, 0 otherwise"""
idx = 0
cont = 1
ok = 0
while (cont == 1):
if idx == len(range):
cont = 0
continue
if usn < int(range[idx]):
if idx %2 == 1:
ok = 1
cont = 0
if usn == int(range[idx]):
cont = 0
ok = 1
idx = idx + 1
return ok
def get_paths(param, targetdir=None, smbconf=None): def get_paths(param, targetdir=None, smbconf=None):
"""Get paths to important provision objects (smb.conf, ldb files, ...) """Get paths to important provision objects (smb.conf, ldb files, ...)
@ -274,7 +301,7 @@ def dn_sort(x, y):
:param x: First object to compare :param x: First object to compare
:param y: Second object to compare :param y: Second object to compare
""" """
p = re.compile(r'(?<!\\),') p = re.compile(r'(?<!\\), ?')
tab1 = p.split(str(x)) tab1 = p.split(str(x))
tab2 = p.split(str(y)) tab2 = p.split(str(y))
minimum = min(len(tab1), len(tab2)) minimum = min(len(tab1), len(tab2))
@ -293,3 +320,116 @@ def dn_sort(x, y):
else: else:
return -1 return -1
return ret return ret
def identic_rename(ldbobj, dn):
"""Perform a back and forth rename to trigger renaming on attribute that
can't be directly modified.
:param lbdobj: An Ldb Object
:param dn: DN of the object to manipulate """
(before, sep, after)=str(dn).partition('=')
ldbobj.rename(dn, ldb.Dn(ldbobj, "%s=foo%s" % (before, after)))
ldbobj.rename(ldb.Dn(ldbobj, "%s=foo%s" % (before, after)), dn)
def chunck_acl(acl):
"""Return separate ACE of an ACL
:param acl: A string representing the ACL
:return: A hash with different parts
"""
p = re.compile(r'(\w+)?(\(.*?\))')
tab = p.findall(acl)
hash = {}
hash["aces"] = []
for e in tab:
if len(e[0]) > 0:
hash["flags"] = e[0]
hash["aces"].append(e[1])
return hash
def chunck_sddl(sddl):
""" Return separate parts of the SDDL (owner, group, ...)
:param sddl: An string containing the SDDL to chunk
:return: A hash with the different chunk
"""
p = re.compile(r'([OGDS]:)(.*?)(?=(?:[GDS]:|$))')
tab = p.findall(sddl)
hash = {}
for e in tab:
if e[0] == "O:":
hash["owner"] = e[1]
if e[0] == "G:":
hash["group"] = e[1]
if e[0] == "D:":
hash["dacl"] = e[1]
if e[0] == "S:":
hash["sacl"] = e[1]
return hash
def get_diff_sddls(refsddl, cursddl):
"""Get the difference between 2 sddl
This function split the textual representation of ACL into smaller
chunck in order to not to report a simple permutation as a difference
:param refsddl: First sddl to compare
:param cursddl: Second sddl to compare
:return: A string that explain difference between sddls"""
txt = ""
hash_new = chunck_sddl(cursddl)
hash_ref = chunck_sddl(refsddl)
if hash_new["owner"] != hash_ref["owner"]:
txt = "\tOwner mismatch: %s (in ref) %s" \
"(in current)\n" % (hash_ref["owner"], hash_new["owner"])
if hash_new["group"] != hash_ref["group"]:
txt = "%s\tGroup mismatch: %s (in ref) %s" \
"(in current)\n" % (txt, hash_ref["group"], hash_new["group"])
for part in ["dacl", "sacl"]:
if hash_new.has_key(part) and hash_ref.has_key(part):
# both are present, check if they contain the same ACE
h_new = {}
h_ref = {}
c_new = chunck_acl(hash_new[part])
c_ref = chunck_acl(hash_ref[part])
for elem in c_new["aces"]:
h_new[elem] = 1
for elem in c_ref["aces"]:
h_ref[elem] = 1
for k in h_ref.keys():
if h_new.has_key(k):
h_new.pop(k)
h_ref.pop(k)
if len(h_new.keys()) + len(h_ref.keys()) > 0:
txt = "%s\tPart %s is different between reference" \
" and current here is the detail:\n" % (txt, part)
for item in h_new.keys():
txt = "%s\t\t%s ACE is not present in the" \
" reference\n" % (txt, item)
for item in h_ref.keys():
txt = "%s\t\t%s ACE is not present in the" \
" current\n" % (txt, item)
elif hash_new.has_key(part) and not hash_ref.has_key(part):
txt = "%s\tReference ACL hasn't a %s part\n" % (txt, part)
elif not hash_new.has_key(part) and hash_ref.has_key(part):
txt = "%s\tCurrent ACL hasn't a %s part\n" % (txt, part)
return txt