1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-08 04:58:40 +03:00

LDAPCmp feature to compare nTSecurityDescriptors

New feature that enables LDAPCmp users to find unmatched or
missing ACEs in objects for the three naming contexts between
DCs in one domain (default) or different domains. Comparing
security descriptors is not the default action but attribute
compatison. So to activate the new mode there is --sd switch.
However there are two view modes to the new --sd action which
are 'section' (default) or 'collision'. In 'section' mode you
can only find differences connected to missing or value
unmatched ACEs but not disorder unmatch if ACE values and count
are the same. All of the mentioned differences plus disorder
ACE unmatch you can observe under 'collision' view however
it is more verbose.

Signed-off-by: Anatoliy Atanasov <anatoliy.atanasov@postpath.com>
This commit is contained in:
Zahari Zahariev 2010-09-30 04:13:02 +03:00 committed by Anatoliy Atanasov
parent bad98e37e7
commit 73763b3678

View File

@ -59,12 +59,17 @@ class LDAPBase(object):
options=ldb_options)
self.two_domains = cmd_opts.two
self.quiet = cmd_opts.quiet
self.descriptor = cmd_opts.descriptor
self.view = cmd_opts.view
self.verbose = cmd_opts.verbose
self.host = host
self.base_dn = self.find_basedn()
self.domain_netbios = self.find_netbios()
self.server_names = self.find_servers()
self.domain_name = re.sub("[Dd][Cc]=", "", self.base_dn).replace(",", ".")
self.domain_sid_bin = self.get_object_sid(self.base_dn)
self.domain_sid = self.find_domain_sid()
self.get_guid_map()
self.get_sid_map()
#
# Log some domain controller specific place-holers that are being used
# when compare content of two DCs. Uncomment for DEBUG purposes.
@ -72,9 +77,13 @@ class LDAPBase(object):
print "\n* Place-holders for %s:" % self.host
print 4*" " + "${DOMAIN_DN} => %s" % self.base_dn
print 4*" " + "${DOMAIN_NETBIOS} => %s" % self.domain_netbios
print 4*" " + "${SERVERNAME} => %s" % self.server_names
print 4*" " + "${SERVER_NAME} => %s" % self.server_names
print 4*" " + "${DOMAIN_NAME} => %s" % self.domain_name
def find_domain_sid(self):
res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
def find_servers(self):
"""
"""
@ -134,22 +143,210 @@ class LDAPBase(object):
res[key] = list(res[key])
return res
def get_descriptor(self, object_dn):
def get_descriptor_sddl(self, object_dn):
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, attrs=["nTSecurityDescriptor"])
return res[0]["nTSecurityDescriptor"][0]
desc = res[0]["nTSecurityDescriptor"][0]
desc = ndr_unpack(security.descriptor, desc)
return desc.as_sddl(self.domain_sid)
def guid_as_string(self, guid_blob):
""" Translate binary representation of schemaIDGUID to standard string representation.
@gid_blob: binary schemaIDGUID
"""
blob = "%s" % guid_blob
stops = [4, 2, 2, 2, 6]
index = 0
res = ""
x = 0
while x < len(stops):
tmp = ""
y = 0
while y < stops[x]:
c = hex(ord(blob[index])).replace("0x", "")
c = [None, "0" + c, c][len(c)]
if 2 * index < len(blob):
tmp = c + tmp
else:
tmp += c
index += 1
y += 1
res += tmp + " "
x += 1
assert index == len(blob)
return res.strip().replace(" ", "-")
def get_guid_map(self):
""" Build dictionary that maps GUID to 'name' attribute found in Schema or Extended-Rights.
"""
self.guid_map = {}
res = self.ldb.search(base="cn=schema,cn=configuration,%s" % self.base_dn, \
expression="(schemaIdGuid=*)", scope=SCOPE_SUBTREE, attrs=["schemaIdGuid", "name"])
for item in res:
self.guid_map[self.guid_as_string(item["schemaIdGuid"]).lower()] = item["name"][0]
#
res = self.ldb.search(base="cn=extended-rights,cn=configuration,%s" % self.base_dn, \
expression="(rightsGuid=*)", scope=SCOPE_SUBTREE, attrs=["rightsGuid", "name"])
for item in res:
self.guid_map[str(item["rightsGuid"]).lower()] = item["name"][0]
def get_sid_map(self):
""" Build dictionary that maps GUID to 'name' attribute found in Schema or Extended-Rights.
"""
self.sid_map = {}
res = self.ldb.search(base="%s" % self.base_dn, \
expression="(objectSid=*)", scope=SCOPE_SUBTREE, attrs=["objectSid", "sAMAccountName"])
for item in res:
try:
self.sid_map["%s" % ndr_unpack(security.dom_sid, item["objectSid"][0])] = item["sAMAccountName"][0]
except KeyError:
pass
class Descriptor(object):
def __init__(self, connection, dn):
self.con = connection
self.dn = dn
self.sddl = self.con.get_descriptor_sddl(self.dn)
self.dacl_list = self.extract_dacl()
def extract_dacl(self):
""" Extracts the DACL as a list of ACE string (with the brakets).
"""
try:
res = re.search("D:(.*?)(\(.*?\))S:", self.sddl).group(2)
except AttributeError:
return []
return re.findall("(\(.*?\))", res)
def fix_guid(self, ace):
res = "%s" % ace
guids = re.findall("[a-z0-9]+?-[a-z0-9]+-[a-z0-9]+-[a-z0-9]+-[a-z0-9]+", res)
# If there are not GUIDs to replace return the same ACE
if len(guids) == 0:
return res
for guid in guids:
try:
name = self.con.guid_map[guid.lower()]
res = res.replace(guid, name)
except KeyError:
# Do not bother if the GUID is not found in
# cn=Schema or cn=Extended-Rights
pass
return res
def fix_sid(self, ace):
res = "%s" % ace
sids = re.findall("S-[-0-9]+", res)
# If there are not SIDs to replace return the same ACE
if len(sids) == 0:
return res
for sid in sids:
try:
name = self.con.sid_map[sid]
res = res.replace(sid, name)
except KeyError:
# Do not bother if the SID is not found in baseDN
pass
return res
def fixit(self, ace):
""" Combine all replacement methods in one
"""
res = "%s" % ace
res = self.fix_guid(res)
res = self.fix_sid(res)
return res
def diff_1(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
res += 4*" " + "Difference in ACE count:\n"
res += 8*" " + "=> %s\n" % len(self.dacl_list)
res += 8*" " + "=> %s\n" % len(other.dacl_list)
#
i = 0
flag = True
while True:
self_ace = None
other_ace = None
try:
self_ace = "%s" % self.dacl_list[i]
except IndexError:
self_ace = ""
#
try:
other_ace = "%s" % other.dacl_list[i]
except IndexError:
other_ace = ""
if len(self_ace) + len(other_ace) == 0:
break
self_ace_fixed = "%s" % self.fixit(self_ace)
other_ace_fixed = "%s" % other.fixit(other_ace)
if self_ace_fixed != other_ace_fixed:
res += "%60s * %s\n" % ( self_ace_fixed, other_ace_fixed )
flag = False
else:
res += "%60s | %s\n" % ( self_ace_fixed, other_ace_fixed )
i += 1
return (flag, res)
def diff_2(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
res += 4*" " + "Difference in ACE count:\n"
res += 8*" " + "=> %s\n" % len(self.dacl_list)
res += 8*" " + "=> %s\n" % len(other.dacl_list)
#
common_aces = []
self_aces = []
other_aces = []
self_dacl_list_fixed = []
other_dacl_list_fixed = []
[self_dacl_list_fixed.append( self.fixit(ace) ) for ace in self.dacl_list]
[other_dacl_list_fixed.append( other.fixit(ace) ) for ace in other.dacl_list]
for ace in self_dacl_list_fixed:
try:
other_dacl_list_fixed.index(ace)
except ValueError:
self_aces.append(ace)
else:
common_aces.append(ace)
self_aces = sorted(self_aces)
if len(self_aces) > 0:
res += 4*" " + "ACEs found only in %s:\n" % self.con.host
for ace in self_aces:
res += 8*" " + ace + "\n"
#
for ace in other_dacl_list_fixed:
try:
self_dacl_list_fixed.index(ace)
except ValueError:
other_aces.append(ace)
else:
common_aces.append(ace)
other_aces = sorted(other_aces)
if len(other_aces) > 0:
res += 4*" " + "ACEs found only in %s:\n" % other.con.host
for ace in other_aces:
res += 8*" " + ace + "\n"
#
common_aces = sorted(list(set(common_aces)))
if self.con.verbose:
res += 4*" " + "ACEs found in both:\n"
for ace in common_aces:
res += 8*" " + ace + "\n"
return (self_aces == [] and other_aces == [], res)
class LDAPObject(object):
def __init__(self, connection, dn, summary, cmd_opts):
def __init__(self, connection, dn, summary):
self.con = connection
self.two_domains = cmd_opts.two
self.quiet = cmd_opts.quiet
self.verbose = cmd_opts.verbose
self.two_domains = self.con.two_domains
self.quiet = self.con.quiet
self.verbose = self.con.verbose
self.summary = summary
self.dn = dn.replace("${DOMAIN_DN}", self.con.base_dn)
self.dn = self.dn.replace("CN=${DOMAIN_NETBIOS}", "CN=%s" % self.con.domain_netbios)
for x in self.con.server_names:
self.dn = self.dn.replace("CN=${SERVERNAME}", "CN=%s" % x)
self.dn = self.dn.replace("CN=${SERVER_NAME}", "CN=%s" % x)
self.attributes = self.con.get_attributes(self.dn)
# Attributes that are considered always to be different e.g based on timestamp etc.
#
@ -199,7 +396,7 @@ class LDAPObject(object):
"dnsHostName", "networkAddress", "dnsRoot", "servicePrincipalName",]
self.domain_attributes = [x.upper() for x in self.domain_attributes]
#
# May contain DOMAIN_NETBIOS and SERVERNAME
# May contain DOMAIN_NETBIOS and SERVER_NAME
self.servername_attributes = [ "distinguishedName", "name", "CN", "sAMAccountName", "dNSHostName",
"servicePrincipalName", "rIDSetReferences", "serverReference", "serverReferenceBL",
"msDS-IsDomainFor", "interSiteTopologyGenerator",]
@ -249,10 +446,30 @@ class LDAPObject(object):
if not self.two_domains or len(self.con.server_names) > 1:
return res
for x in self.con.server_names:
res = res.upper().replace(x, "${SERVERNAME}")
res = res.upper().replace(x, "${SERVER_NAME}")
return res
def __eq__(self, other):
if self.con.descriptor:
return self.cmp_desc(other)
return self.cmp_attrs(other)
def cmp_desc(self, other):
d1 = Descriptor(self.con, self.dn)
d2 = Descriptor(other.con, other.dn)
if self.con.view == "section":
res = d1.diff_2(d2)
elif self.con.view == "collision":
res = d1.diff_1(d2)
else:
raise Exception("Unknown --view option value.")
#
self.screen_output = res[1][:-1]
other.screen_output = res[1][:-1]
#
return res[0]
def cmp_attrs(self, other):
res = ""
self.unique_attrs = []
self.df_value_attrs = []
@ -324,7 +541,7 @@ class LDAPObject(object):
continue
#
if x.upper() in self.servername_attributes:
# Attributes with SERVERNAME
# Attributes with SERVER_NAME
m = p
n = q
if not p and not q:
@ -370,12 +587,11 @@ class LDAPObject(object):
class LDAPBundel(object):
def __init__(self, connection, context, cmd_opts, dn_list=None):
def __init__(self, connection, context, dn_list=None):
self.con = connection
self.cmd_opts = cmd_opts
self.two_domains = cmd_opts.two
self.quiet = cmd_opts.quiet
self.verbose = cmd_opts.verbose
self.two_domains = self.con.two_domains
self.quiet = self.con.quiet
self.verbose = self.con.verbose
self.summary = {}
self.summary["unique_attrs"] = []
self.summary["df_value_attrs"] = []
@ -396,7 +612,7 @@ class LDAPBundel(object):
tmp = tmp.replace("CN=%s" % self.con.domain_netbios, "CN=${DOMAIN_NETBIOS}")
if len(self.con.server_names) == 1:
for x in self.con.server_names:
tmp = tmp.replace("CN=%s" % x, "CN=${SERVERNAME}")
tmp = tmp.replace("CN=%s" % x, "CN=${SERVER_NAME}")
self.dn_list[counter] = tmp
counter += 1
self.dn_list = list(set(self.dn_list))
@ -454,16 +670,14 @@ class LDAPBundel(object):
try:
object1 = LDAPObject(connection=self.con,
dn=self.dn_list[index],
summary=self.summary,
cmd_opts = self.cmd_opts)
summary=self.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
self.log( "\n!!! Object not found: %s" % self.dn_list[index] )
skip = True
try:
object2 = LDAPObject(connection=other.con,
dn=other.dn_list[index],
summary=other.summary,
cmd_opts = self.cmd_opts)
summary=other.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
self.log( "\n!!! Object not found: %s" % other.dn_list[index] )
skip = True
@ -471,7 +685,7 @@ class LDAPBundel(object):
index += 1
continue
if object1 == object2:
if self.verbose:
if self.con.verbose:
self.log( "\nComparing:" )
self.log( "'%s' [%s]" % (object1.dn, object1.con.host) )
self.log( "'%s' [%s]" % (object2.dn, object2.con.host) )
@ -542,6 +756,10 @@ if __name__ == "__main__":
help="Do not print anything but relay on just exit code",)
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="Print all DN pairs that have been compared",)
parser.add_option("", "--sd", dest="descriptor", action="store_true", default=False,
help="Compare nTSecurityDescriptor attibutes only",)
parser.add_option("", "--view", dest="view", default="section",
help="Display mode for nTSecurityDescriptor results. Possible values: section or collision.",)
(opts, args) = parser.parse_args()
lp = sambaopts.get_loadparm()
@ -566,6 +784,8 @@ if __name__ == "__main__":
if opts.verbose and opts.quiet:
parser.error("You cannot set --verbose and --quiet together")
if opts.descriptor and opts.view.upper() not in ["SECTION", "COLLISION"]:
parser.error("Unknown --view option value. Choose from: section or collision.")
con1 = LDAPBase(opts.host, opts, creds, lp)
assert len(con1.base_dn) > 0
@ -578,8 +798,8 @@ if __name__ == "__main__":
if not opts.quiet:
print "\n* Comparing [%s] context..." % context
b1 = LDAPBundel(con1, context=context, cmd_opts=opts)
b2 = LDAPBundel(con2, context=context, cmd_opts=opts)
b1 = LDAPBundel(con1, context=context)
b2 = LDAPBundel(con2, context=context)
if b1 == b2:
if not opts.quiet:
@ -587,16 +807,14 @@ if __name__ == "__main__":
else:
if not opts.quiet:
print "\n* Result for [%s]: FAILURE" % context
print "\nSUMMARY"
print "---------"
if not opts.descriptor:
assert len(b1.summary["df_value_attrs"]) == len(b2.summary["df_value_attrs"])
b2.summary["df_value_attrs"] = []
print "\nSUMMARY"
print "---------"
b1.print_summary()
b2.print_summary()
# mark exit status as FAILURE if a least one comparison failed
status = -1
assert len(b1.summary["df_value_attrs"]) == len(b2.summary["df_value_attrs"])
b2.summary["df_value_attrs"] = []
if not opts.quiet:
b1.print_summary()
b2.print_summary()
sys.exit(status)