1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-08 04:58:40 +03:00
Zahari Zahariev 73763b3678 LDAPCmp feature to compare nTSecurityDescriptors
New feature that enables LDAPCmp users to find unmatched or
missing ACEs in objects for the three naming contexts between
DCs in one domain (default) or different domains. Comparing
security descriptors is not the default action but attribute
compatison. So to activate the new mode there is --sd switch.
However there are two view modes to the new --sd action which
are 'section' (default) or 'collision'. In 'section' mode you
can only find differences connected to missing or value
unmatched ACEs but not disorder unmatch if ACE values and count
are the same. All of the mentioned differences plus disorder
ACE unmatch you can observe under 'collision' view however
it is more verbose.

Signed-off-by: Anatoliy Atanasov <anatoliy.atanasov@postpath.com>
2010-09-30 09:41:20 -07:00

821 lines
33 KiB
Python
Executable File

#!/usr/bin/env python
#
# Unix SMB/CIFS implementation.
# A script to compare differences of objects and attributes between
# two LDAP servers both running at the same time. It generally compares
# one of the three pratitions DOMAIN, CONFIGURATION or SCHEMA. Users
# that have to be provided sheould be able to read objects in any of the
# above partitions.
# Copyright (C) Zahari Zahariev <zahari.zahariev@postpath.com> 2009, 2010
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
import sys
from optparse import OptionParser
sys.path.insert(0, "bin/python")
import samba
import samba.getopt as options
from samba import Ldb
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, ERR_NO_SUCH_OBJECT, LdbError
global summary
summary = {}
class LDAPBase(object):
def __init__(self, host, cmd_opts, creds, lp):
ldb_options = []
samdb_url = host
if not "://" in host:
if os.path.isfile(host):
samdb_url = "tdb://%s" % host
else:
samdb_url = "ldap://%s:389" % host
# use 'paged_search' module when connecting remotely
if samdb_url.lower().startswith("ldap://"):
ldb_options = ["modules:paged_searches"]
self.ldb = Ldb(url=samdb_url,
credentials=creds,
lp=lp,
options=ldb_options)
self.two_domains = cmd_opts.two
self.quiet = cmd_opts.quiet
self.descriptor = cmd_opts.descriptor
self.view = cmd_opts.view
self.verbose = cmd_opts.verbose
self.host = host
self.base_dn = self.find_basedn()
self.domain_netbios = self.find_netbios()
self.server_names = self.find_servers()
self.domain_name = re.sub("[Dd][Cc]=", "", self.base_dn).replace(",", ".")
self.domain_sid = self.find_domain_sid()
self.get_guid_map()
self.get_sid_map()
#
# Log some domain controller specific place-holers that are being used
# when compare content of two DCs. Uncomment for DEBUG purposes.
if self.two_domains and not self.quiet:
print "\n* Place-holders for %s:" % self.host
print 4*" " + "${DOMAIN_DN} => %s" % self.base_dn
print 4*" " + "${DOMAIN_NETBIOS} => %s" % self.domain_netbios
print 4*" " + "${SERVER_NAME} => %s" % self.server_names
print 4*" " + "${DOMAIN_NAME} => %s" % self.domain_name
def find_domain_sid(self):
res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
def find_servers(self):
"""
"""
res = self.ldb.search(base="OU=Domain Controllers,%s" % self.base_dn, \
scope=SCOPE_SUBTREE, expression="(objectClass=computer)", attrs=["cn"])
assert len(res) > 0
srv = []
for x in res:
srv.append(x["cn"][0])
return srv
def find_netbios(self):
res = self.ldb.search(base="CN=Partitions,CN=Configuration,%s" % self.base_dn, \
scope=SCOPE_SUBTREE, attrs=["nETBIOSName"])
assert len(res) > 0
for x in res:
if "nETBIOSName" in x.keys():
return x["nETBIOSName"][0]
def find_basedn(self):
res = self.ldb.search(base="", expression="(objectClass=*)", scope=SCOPE_BASE,
attrs=["defaultNamingContext"])
assert len(res) == 1
return res[0]["defaultNamingContext"][0]
def object_exists(self, object_dn):
res = None
try:
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, expression="(objectClass=*)")
except LdbError, (ERR_NO_SUCH_OBJECT, _):
return False
return len(res) == 1
def get_object_sid(self, object_dn):
try:
res = self.ldb.search(base=object_dn, expression="(objectClass=*)", scope=SCOPE_BASE, attrs=["objectSid"])
except LdbError, (ERR_NO_SUCH_OBJECT, _):
raise Exception("DN sintax is wrong or object does't exist: " + object_dn)
assert len(res) == 1
return res[0]["objectSid"][0]
def delete_force(self, object_dn):
try:
self.ldb.delete(object_dn)
except Ldb.LdbError, e:
assert "No such object" in str(e)
def get_attributes(self, object_dn):
""" Returns dict with all default visible attributes
"""
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, attrs=["*"])
assert len(res) == 1
res = dict(res[0])
# 'Dn' element is not iterable and we have it as 'distinguishedName'
del res["dn"]
for key in res.keys():
res[key] = list(res[key])
return res
def get_descriptor_sddl(self, object_dn):
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, attrs=["nTSecurityDescriptor"])
desc = res[0]["nTSecurityDescriptor"][0]
desc = ndr_unpack(security.descriptor, desc)
return desc.as_sddl(self.domain_sid)
def guid_as_string(self, guid_blob):
""" Translate binary representation of schemaIDGUID to standard string representation.
@gid_blob: binary schemaIDGUID
"""
blob = "%s" % guid_blob
stops = [4, 2, 2, 2, 6]
index = 0
res = ""
x = 0
while x < len(stops):
tmp = ""
y = 0
while y < stops[x]:
c = hex(ord(blob[index])).replace("0x", "")
c = [None, "0" + c, c][len(c)]
if 2 * index < len(blob):
tmp = c + tmp
else:
tmp += c
index += 1
y += 1
res += tmp + " "
x += 1
assert index == len(blob)
return res.strip().replace(" ", "-")
def get_guid_map(self):
""" Build dictionary that maps GUID to 'name' attribute found in Schema or Extended-Rights.
"""
self.guid_map = {}
res = self.ldb.search(base="cn=schema,cn=configuration,%s" % self.base_dn, \
expression="(schemaIdGuid=*)", scope=SCOPE_SUBTREE, attrs=["schemaIdGuid", "name"])
for item in res:
self.guid_map[self.guid_as_string(item["schemaIdGuid"]).lower()] = item["name"][0]
#
res = self.ldb.search(base="cn=extended-rights,cn=configuration,%s" % self.base_dn, \
expression="(rightsGuid=*)", scope=SCOPE_SUBTREE, attrs=["rightsGuid", "name"])
for item in res:
self.guid_map[str(item["rightsGuid"]).lower()] = item["name"][0]
def get_sid_map(self):
""" Build dictionary that maps GUID to 'name' attribute found in Schema or Extended-Rights.
"""
self.sid_map = {}
res = self.ldb.search(base="%s" % self.base_dn, \
expression="(objectSid=*)", scope=SCOPE_SUBTREE, attrs=["objectSid", "sAMAccountName"])
for item in res:
try:
self.sid_map["%s" % ndr_unpack(security.dom_sid, item["objectSid"][0])] = item["sAMAccountName"][0]
except KeyError:
pass
class Descriptor(object):
def __init__(self, connection, dn):
self.con = connection
self.dn = dn
self.sddl = self.con.get_descriptor_sddl(self.dn)
self.dacl_list = self.extract_dacl()
def extract_dacl(self):
""" Extracts the DACL as a list of ACE string (with the brakets).
"""
try:
res = re.search("D:(.*?)(\(.*?\))S:", self.sddl).group(2)
except AttributeError:
return []
return re.findall("(\(.*?\))", res)
def fix_guid(self, ace):
res = "%s" % ace
guids = re.findall("[a-z0-9]+?-[a-z0-9]+-[a-z0-9]+-[a-z0-9]+-[a-z0-9]+", res)
# If there are not GUIDs to replace return the same ACE
if len(guids) == 0:
return res
for guid in guids:
try:
name = self.con.guid_map[guid.lower()]
res = res.replace(guid, name)
except KeyError:
# Do not bother if the GUID is not found in
# cn=Schema or cn=Extended-Rights
pass
return res
def fix_sid(self, ace):
res = "%s" % ace
sids = re.findall("S-[-0-9]+", res)
# If there are not SIDs to replace return the same ACE
if len(sids) == 0:
return res
for sid in sids:
try:
name = self.con.sid_map[sid]
res = res.replace(sid, name)
except KeyError:
# Do not bother if the SID is not found in baseDN
pass
return res
def fixit(self, ace):
""" Combine all replacement methods in one
"""
res = "%s" % ace
res = self.fix_guid(res)
res = self.fix_sid(res)
return res
def diff_1(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
res += 4*" " + "Difference in ACE count:\n"
res += 8*" " + "=> %s\n" % len(self.dacl_list)
res += 8*" " + "=> %s\n" % len(other.dacl_list)
#
i = 0
flag = True
while True:
self_ace = None
other_ace = None
try:
self_ace = "%s" % self.dacl_list[i]
except IndexError:
self_ace = ""
#
try:
other_ace = "%s" % other.dacl_list[i]
except IndexError:
other_ace = ""
if len(self_ace) + len(other_ace) == 0:
break
self_ace_fixed = "%s" % self.fixit(self_ace)
other_ace_fixed = "%s" % other.fixit(other_ace)
if self_ace_fixed != other_ace_fixed:
res += "%60s * %s\n" % ( self_ace_fixed, other_ace_fixed )
flag = False
else:
res += "%60s | %s\n" % ( self_ace_fixed, other_ace_fixed )
i += 1
return (flag, res)
def diff_2(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
res += 4*" " + "Difference in ACE count:\n"
res += 8*" " + "=> %s\n" % len(self.dacl_list)
res += 8*" " + "=> %s\n" % len(other.dacl_list)
#
common_aces = []
self_aces = []
other_aces = []
self_dacl_list_fixed = []
other_dacl_list_fixed = []
[self_dacl_list_fixed.append( self.fixit(ace) ) for ace in self.dacl_list]
[other_dacl_list_fixed.append( other.fixit(ace) ) for ace in other.dacl_list]
for ace in self_dacl_list_fixed:
try:
other_dacl_list_fixed.index(ace)
except ValueError:
self_aces.append(ace)
else:
common_aces.append(ace)
self_aces = sorted(self_aces)
if len(self_aces) > 0:
res += 4*" " + "ACEs found only in %s:\n" % self.con.host
for ace in self_aces:
res += 8*" " + ace + "\n"
#
for ace in other_dacl_list_fixed:
try:
self_dacl_list_fixed.index(ace)
except ValueError:
other_aces.append(ace)
else:
common_aces.append(ace)
other_aces = sorted(other_aces)
if len(other_aces) > 0:
res += 4*" " + "ACEs found only in %s:\n" % other.con.host
for ace in other_aces:
res += 8*" " + ace + "\n"
#
common_aces = sorted(list(set(common_aces)))
if self.con.verbose:
res += 4*" " + "ACEs found in both:\n"
for ace in common_aces:
res += 8*" " + ace + "\n"
return (self_aces == [] and other_aces == [], res)
class LDAPObject(object):
def __init__(self, connection, dn, summary):
self.con = connection
self.two_domains = self.con.two_domains
self.quiet = self.con.quiet
self.verbose = self.con.verbose
self.summary = summary
self.dn = dn.replace("${DOMAIN_DN}", self.con.base_dn)
self.dn = self.dn.replace("CN=${DOMAIN_NETBIOS}", "CN=%s" % self.con.domain_netbios)
for x in self.con.server_names:
self.dn = self.dn.replace("CN=${SERVER_NAME}", "CN=%s" % x)
self.attributes = self.con.get_attributes(self.dn)
# Attributes that are considered always to be different e.g based on timestamp etc.
#
# One domain - two domain controllers
self.ignore_attributes = [
# Default Naming Context
"lastLogon", "lastLogoff", "badPwdCount", "logonCount", "badPasswordTime", "modifiedCount",
"operatingSystemVersion","oEMInformation",
# Configuration Naming Context
"repsFrom", "dSCorePropagationData", "msExchServer1HighestUSN",
"replUpToDateVector", "repsTo", "whenChanged", "uSNChanged", "uSNCreated",
# Schema Naming Context
"prefixMap",]
self.dn_attributes = []
self.domain_attributes = []
self.servername_attributes = []
self.netbios_attributes = []
self.other_attributes = []
# Two domains - two domain controllers
if self.two_domains:
self.ignore_attributes += [
"objectCategory", "objectGUID", "objectSid", "whenCreated", "pwdLastSet", "uSNCreated", "creationTime",
"modifiedCount", "priorSetTime", "rIDManagerReference", "gPLink", "ipsecNFAReference",
"fRSPrimaryMember", "fSMORoleOwner", "masteredBy", "ipsecOwnersReference", "wellKnownObjects",
"badPwdCount", "ipsecISAKMPReference", "ipsecFilterReference", "msDs-masteredBy", "lastSetTime",
"ipsecNegotiationPolicyReference", "subRefs", "gPCFileSysPath", "accountExpires", "invocationId",
# After Exchange preps
"targetAddress", "msExchMailboxGuid", "siteFolderGUID"]
#
# Attributes that contain the unique DN tail part e.g. 'DC=samba,DC=org'
self.dn_attributes = [
"distinguishedName", "defaultObjectCategory", "member", "memberOf", "siteList", "nCName",
"homeMDB", "homeMTA", "interSiteTopologyGenerator", "serverReference",
"msDS-HasInstantiatedNCs", "hasMasterNCs", "msDS-hasMasterNCs", "msDS-HasDomainNCs", "dMDLocation",
"msDS-IsDomainFor", "rIDSetReferences", "serverReferenceBL",
# After Exchange preps
"msExchHomeRoutingGroup", "msExchResponsibleMTAServer", "siteFolderServer", "msExchRoutingMasterDN",
"msExchRoutingGroupMembersBL", "homeMDBBL", "msExchHomePublicMDB", "msExchOwningServer", "templateRoots",
"addressBookRoots", "msExchPolicyRoots", "globalAddressList", "msExchOwningPFTree",
"msExchResponsibleMTAServerBL", "msExchOwningPFTreeBL",]
self.dn_attributes = [x.upper() for x in self.dn_attributes]
#
# Attributes that contain the Domain name e.g. 'samba.org'
self.domain_attributes = [
"proxyAddresses", "mail", "userPrincipalName", "msExchSmtpFullyQualifiedDomainName",
"dnsHostName", "networkAddress", "dnsRoot", "servicePrincipalName",]
self.domain_attributes = [x.upper() for x in self.domain_attributes]
#
# May contain DOMAIN_NETBIOS and SERVER_NAME
self.servername_attributes = [ "distinguishedName", "name", "CN", "sAMAccountName", "dNSHostName",
"servicePrincipalName", "rIDSetReferences", "serverReference", "serverReferenceBL",
"msDS-IsDomainFor", "interSiteTopologyGenerator",]
self.servername_attributes = [x.upper() for x in self.servername_attributes]
#
self.netbios_attributes = [ "servicePrincipalName", "CN", "distinguishedName", "nETBIOSName", "name",]
self.netbios_attributes = [x.upper() for x in self.netbios_attributes]
#
self.other_attributes = [ "name", "DC",]
self.other_attributes = [x.upper() for x in self.other_attributes]
#
self.ignore_attributes = [x.upper() for x in self.ignore_attributes]
def log(self, msg):
"""
Log on the screen if there is no --quiet oprion set
"""
if not self.quiet:
print msg
def fix_dn(self, s):
res = "%s" % s
if not self.two_domains:
return res
if res.upper().endswith(self.con.base_dn.upper()):
res = res[:len(res)-len(self.con.base_dn)] + "${DOMAIN_DN}"
return res
def fix_domain_name(self, s):
res = "%s" % s
if not self.two_domains:
return res
res = res.replace(self.con.domain_name.lower(), self.con.domain_name.upper())
res = res.replace(self.con.domain_name.upper(), "${DOMAIN_NAME}")
return res
def fix_domain_netbios(self, s):
res = "%s" % s
if not self.two_domains:
return res
res = res.replace(self.con.domain_netbios.lower(), self.con.domain_netbios.upper())
res = res.replace(self.con.domain_netbios.upper(), "${DOMAIN_NETBIOS}")
return res
def fix_server_name(self, s):
res = "%s" % s
if not self.two_domains or len(self.con.server_names) > 1:
return res
for x in self.con.server_names:
res = res.upper().replace(x, "${SERVER_NAME}")
return res
def __eq__(self, other):
if self.con.descriptor:
return self.cmp_desc(other)
return self.cmp_attrs(other)
def cmp_desc(self, other):
d1 = Descriptor(self.con, self.dn)
d2 = Descriptor(other.con, other.dn)
if self.con.view == "section":
res = d1.diff_2(d2)
elif self.con.view == "collision":
res = d1.diff_1(d2)
else:
raise Exception("Unknown --view option value.")
#
self.screen_output = res[1][:-1]
other.screen_output = res[1][:-1]
#
return res[0]
def cmp_attrs(self, other):
res = ""
self.unique_attrs = []
self.df_value_attrs = []
other.unique_attrs = []
if self.attributes.keys() != other.attributes.keys():
#
title = 4*" " + "Attributes found only in %s:" % self.con.host
for x in self.attributes.keys():
if not x in other.attributes.keys() and \
not x.upper() in [q.upper() for q in other.ignore_attributes]:
if title:
res += title + "\n"
title = None
res += 8*" " + x + "\n"
self.unique_attrs.append(x)
#
title = 4*" " + "Attributes found only in %s:" % other.con.host
for x in other.attributes.keys():
if not x in self.attributes.keys() and \
not x.upper() in [q.upper() for q in self.ignore_attributes]:
if title:
res += title + "\n"
title = None
res += 8*" " + x + "\n"
other.unique_attrs.append(x)
#
missing_attrs = [x.upper() for x in self.unique_attrs]
missing_attrs += [x.upper() for x in other.unique_attrs]
title = 4*" " + "Difference in attribute values:"
for x in self.attributes.keys():
if x.upper() in self.ignore_attributes or x.upper() in missing_attrs:
continue
if isinstance(self.attributes[x], list) and isinstance(other.attributes[x], list):
self.attributes[x] = sorted(self.attributes[x])
other.attributes[x] = sorted(other.attributes[x])
if self.attributes[x] != other.attributes[x]:
p = None
q = None
m = None
n = None
# First check if the difference can be fixed but shunting the first part
# of the DomainHostName e.g. 'mysamba4.test.local' => 'mysamba4'
if x.upper() in self.other_attributes:
p = [self.con.domain_name.split(".")[0] == j for j in self.attributes[x]]
q = [other.con.domain_name.split(".")[0] == j for j in other.attributes[x]]
if p == q:
continue
# Attribute values that are list that contain DN based values that may differ
elif x.upper() in self.dn_attributes:
m = p
n = q
if not p and not q:
m = self.attributes[x]
n = other.attributes[x]
p = [self.fix_dn(j) for j in m]
q = [other.fix_dn(j) for j in n]
if p == q:
continue
# Attributes that contain the Domain name in them
if x.upper() in self.domain_attributes:
m = p
n = q
if not p and not q:
m = self.attributes[x]
n = other.attributes[x]
p = [self.fix_domain_name(j) for j in m]
q = [other.fix_domain_name(j) for j in n]
if p == q:
continue
#
if x.upper() in self.servername_attributes:
# Attributes with SERVER_NAME
m = p
n = q
if not p and not q:
m = self.attributes[x]
n = other.attributes[x]
p = [self.fix_server_name(j) for j in m]
q = [other.fix_server_name(j) for j in n]
if p == q:
continue
#
if x.upper() in self.netbios_attributes:
# Attributes with NETBIOS Domain name
m = p
n = q
if not p and not q:
m = self.attributes[x]
n = other.attributes[x]
p = [self.fix_domain_netbios(j) for j in m]
q = [other.fix_domain_netbios(j) for j in n]
if p == q:
continue
#
if title:
res += title + "\n"
title = None
if p and q:
res += 8*" " + x + " => \n%s\n%s" % (p, q) + "\n"
else:
res += 8*" " + x + " => \n%s\n%s" % (self.attributes[x], other.attributes[x]) + "\n"
self.df_value_attrs.append(x)
#
if self.unique_attrs + other.unique_attrs != []:
assert self.unique_attrs != other.unique_attrs
self.summary["unique_attrs"] += self.unique_attrs
self.summary["df_value_attrs"] += self.df_value_attrs
other.summary["unique_attrs"] += other.unique_attrs
other.summary["df_value_attrs"] += self.df_value_attrs # they are the same
#
self.screen_output = res[:-1]
other.screen_output = res[:-1]
#
return res == ""
class LDAPBundel(object):
def __init__(self, connection, context, dn_list=None):
self.con = connection
self.two_domains = self.con.two_domains
self.quiet = self.con.quiet
self.verbose = self.con.verbose
self.summary = {}
self.summary["unique_attrs"] = []
self.summary["df_value_attrs"] = []
self.summary["known_ignored_dn"] = []
self.summary["abnormal_ignored_dn"] = []
if dn_list:
self.dn_list = dn_list
elif context.upper() in ["DOMAIN", "CONFIGURATION", "SCHEMA"]:
self.context = context.upper()
self.dn_list = self.get_dn_list(context)
else:
raise Exception("Unknown initialization data for LDAPBundel().")
counter = 0
while counter < len(self.dn_list) and self.two_domains:
# Use alias reference
tmp = self.dn_list[counter]
tmp = tmp[:len(tmp)-len(self.con.base_dn)] + "${DOMAIN_DN}"
tmp = tmp.replace("CN=%s" % self.con.domain_netbios, "CN=${DOMAIN_NETBIOS}")
if len(self.con.server_names) == 1:
for x in self.con.server_names:
tmp = tmp.replace("CN=%s" % x, "CN=${SERVER_NAME}")
self.dn_list[counter] = tmp
counter += 1
self.dn_list = list(set(self.dn_list))
self.dn_list = sorted(self.dn_list)
self.size = len(self.dn_list)
def log(self, msg):
"""
Log on the screen if there is no --quiet oprion set
"""
if not self.quiet:
print msg
def update_size(self):
self.size = len(self.dn_list)
self.dn_list = sorted(self.dn_list)
def __eq__(self, other):
res = True
if self.size != other.size:
self.log( "\n* DN lists have different size: %s != %s" % (self.size, other.size) )
res = False
#
title= "\n* DNs found only in %s:" % self.con.host
for x in self.dn_list:
if not x.upper() in [q.upper() for q in other.dn_list]:
if title:
self.log( title )
title = None
res = False
self.log( 4*" " + x )
self.dn_list[self.dn_list.index(x)] = ""
self.dn_list = [x for x in self.dn_list if x]
#
title= "\n* DNs found only in %s:" % other.con.host
for x in other.dn_list:
if not x.upper() in [q.upper() for q in self.dn_list]:
if title:
self.log( title )
title = None
res = False
self.log( 4*" " + x )
other.dn_list[other.dn_list.index(x)] = ""
other.dn_list = [x for x in other.dn_list if x]
#
self.update_size()
other.update_size()
assert self.size == other.size
assert sorted([x.upper() for x in self.dn_list]) == sorted([x.upper() for x in other.dn_list])
self.log( "\n* Objects to be compared: %s" % self.size )
index = 0
while index < self.size:
skip = False
try:
object1 = LDAPObject(connection=self.con,
dn=self.dn_list[index],
summary=self.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
self.log( "\n!!! Object not found: %s" % self.dn_list[index] )
skip = True
try:
object2 = LDAPObject(connection=other.con,
dn=other.dn_list[index],
summary=other.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
self.log( "\n!!! Object not found: %s" % other.dn_list[index] )
skip = True
if skip:
index += 1
continue
if object1 == object2:
if self.con.verbose:
self.log( "\nComparing:" )
self.log( "'%s' [%s]" % (object1.dn, object1.con.host) )
self.log( "'%s' [%s]" % (object2.dn, object2.con.host) )
self.log( 4*" " + "OK" )
else:
self.log( "\nComparing:" )
self.log( "'%s' [%s]" % (object1.dn, object1.con.host) )
self.log( "'%s' [%s]" % (object2.dn, object2.con.host) )
self.log( object1.screen_output )
self.log( 4*" " + "FAILED" )
res = False
self.summary = object1.summary
other.summary = object2.summary
index += 1
#
return res
def get_dn_list(self, context):
""" Query LDAP server about the DNs of certain naming self.con.ext Domain (or Default), Configuration, Schema.
Parse all DNs and filter those that are 'strange' or abnormal.
"""
if context.upper() == "DOMAIN":
search_base = "%s" % self.con.base_dn
elif context.upper() == "CONFIGURATION":
search_base = "CN=Configuration,%s" % self.con.base_dn
elif context.upper() == "SCHEMA":
search_base = "CN=Schema,CN=Configuration,%s" % self.con.base_dn
dn_list = []
res = self.con.ldb.search(base=search_base, scope=SCOPE_SUBTREE, attrs=["dn"])
for x in res:
dn_list.append(x["dn"].get_linearized())
#
global summary
#
return dn_list
def print_summary(self):
self.summary["unique_attrs"] = list(set(self.summary["unique_attrs"]))
self.summary["df_value_attrs"] = list(set(self.summary["df_value_attrs"]))
#
if self.summary["unique_attrs"]:
self.log( "\nAttributes found only in %s:" % self.con.host )
self.log( "".join([str("\n" + 4*" " + x) for x in self.summary["unique_attrs"]]) )
#
if self.summary["df_value_attrs"]:
self.log( "\nAttributes with different values:" )
self.log( "".join([str("\n" + 4*" " + x) for x in self.summary["df_value_attrs"]]) )
self.summary["df_value_attrs"] = []
###
if __name__ == "__main__":
parser = OptionParser("ldapcmp [options] domain|configuration|schema")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
credopts = options.CredentialsOptionsDouble(parser)
parser.add_option_group(credopts)
parser.add_option("", "--host", dest="host",
help="IP of the first LDAP server",)
parser.add_option("", "--host2", dest="host2",
help="IP of the second LDAP server",)
parser.add_option("-w", "--two", dest="two", action="store_true", default=False,
help="Hosts are in two different domains",)
parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False,
help="Do not print anything but relay on just exit code",)
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="Print all DN pairs that have been compared",)
parser.add_option("", "--sd", dest="descriptor", action="store_true", default=False,
help="Compare nTSecurityDescriptor attibutes only",)
parser.add_option("", "--view", dest="view", default="section",
help="Display mode for nTSecurityDescriptor results. Possible values: section or collision.",)
(opts, args) = parser.parse_args()
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
creds2 = credopts.get_credentials2(lp)
if creds2.is_anonymous():
creds2 = creds
if creds.is_anonymous():
parser.error("You must supply at least one username/password pair")
# make a list of contexts to compare in
contexts = []
if len(args) == 0:
# if no argument given, we compare all contexts
contexts = ["DOMAIN", "CONFIGURATION", "SCHEMA"]
else:
for context in args:
if not context.upper() in ["DOMAIN", "CONFIGURATION", "SCHEMA"]:
parser.error("Incorrect argument: %s" % context)
contexts.append(context.upper())
if opts.verbose and opts.quiet:
parser.error("You cannot set --verbose and --quiet together")
if opts.descriptor and opts.view.upper() not in ["SECTION", "COLLISION"]:
parser.error("Unknown --view option value. Choose from: section or collision.")
con1 = LDAPBase(opts.host, opts, creds, lp)
assert len(con1.base_dn) > 0
con2 = LDAPBase(opts.host2, opts, creds2, lp)
assert len(con2.base_dn) > 0
status = 0
for context in contexts:
if not opts.quiet:
print "\n* Comparing [%s] context..." % context
b1 = LDAPBundel(con1, context=context)
b2 = LDAPBundel(con2, context=context)
if b1 == b2:
if not opts.quiet:
print "\n* Result for [%s]: SUCCESS" % context
else:
if not opts.quiet:
print "\n* Result for [%s]: FAILURE" % context
if not opts.descriptor:
assert len(b1.summary["df_value_attrs"]) == len(b2.summary["df_value_attrs"])
b2.summary["df_value_attrs"] = []
print "\nSUMMARY"
print "---------"
b1.print_summary()
b2.print_summary()
# mark exit status as FAILURE if a least one comparison failed
status = -1
sys.exit(status)