mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00

Comparison tool for LDAP servers (using Ldb)

This tool is integrated with Samba4 Ldb. It provides a useful output
where you can find easy differences in objects or attributes within
naming context (Domain, Configuration or Schema).

Added functionality for two sets of credentials.
This commit is contained in:
Zahari Zahariev 2010-01-13 10:41:56 +02:00 committed by Nadezhda Ivanova
parent 9b3871ed29
commit 5d1aa4c5b7
2 changed files with 501 additions and 0 deletions

source4/scripting/devel/ldapcmp Executable file
View File

@ -0,0 +1,449 @@
# Unix SMB/CIFS implementation.
# A script to compare differences of objects and attributes between
# two LDAP servers both running at the same time. It generally compares
# one of the three pratitions DOMAIN, CONFIGURATION or SCHEMA. Users
# that have to be provided sheould be able to read objects in any of the
# above partitions.
# Copyright (C) Zahari Zahariev <zahari.zahariev@postpath.com> 2009
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import sys
from optparse import OptionParser
sys.path.insert(0, "bin/python")
import samba
import samba.getopt as options
from samba import Ldb
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
global summary
summary = {}
class LDAPBase(object):
def __init__(self, host, creds, lp):
if not "://" in host:
self.host = "ldap://" + host + ":389"
self.ldb = Ldb(self.host, credentials=creds, lp=lp,
self.base_dn = self.find_basedn()
self.netbios_name = self.find_netbios()
self.domain_name = re.sub("[Dd][Cc]=", "", self.base_dn).replace(",", ".")
self.domain_sid_bin = self.get_object_sid(self.base_dn)
def find_netbios(self):
res = self.ldb.search(base="CN=Partitions,CN=Configuration,%s" % self.base_dn, \
scope=SCOPE_SUBTREE, attrs=["nETBIOSName"])
assert len(res) > 0
for x in res:
if "nETBIOSName" in x.keys():
return x["nETBIOSName"][0]
def find_basedn(self):
res = self.ldb.search(base="", expression="(objectClass=*)", scope=SCOPE_BASE,
assert len(res) == 1
return res[0]["defaultNamingContext"][0]
def object_exists(self, object_dn):
res = None
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, expression="(objectClass=*)")
except LdbError, (ERR_NO_SUCH_OBJECT, _):
return False
return len(res) == 1
def get_object_sid(self, object_dn):
res = self.ldb.search(base=object_dn, expression="(objectClass=*)", scope=SCOPE_BASE, attrs=["objectSid"])
except LdbError, (ERR_NO_SUCH_OBJECT, _):
raise Exception("DN sintax is wrong or object does't exist: " + object_dn)
assert len(res) == 1
return res[0]["objectSid"][0]
def delete_force(self, object_dn):
except Ldb.LdbError, e:
assert "No such object" in str(e)
def get_attributes(self, object_dn):
""" Returns dict with all default visible attributes
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, attrs=["*"])
assert len(res) == 1
res = dict(res[0])
# 'Dn' element is not iterable and we have it as 'distinguishedName'
del res["dn"]
for key in res.keys():
res[key] = list(res[key])
return res
def get_descriptor(self, object_dn):
res = self.ldb.search(base=object_dn, scope=SCOPE_BASE, attrs=["nTSecurityDescriptor"])
return res[0]["nTSecurityDescriptor"][0]
class AdObject(object):
def __init__(self, con, dn, summary):
self.con = con
self.summary = summary
self.dn = dn.replace("${DOMAIN_DN}", self.con.base_dn)
self.attributes = self.con.get_attributes(self.dn)
# attributes that are considered always to be different e.g based on timestamp etc.
self.ignore_attributes = ["objectCategory", "objectGUID", \
"whenChanged", "objectSid", "whenCreated", "uSNChanged", "pwdLastSet", \
"uSNCreated", "logonCount", "badPasswordTime", "lastLogon", "creationTime", \
"modifiedCount", "priorSetTime", "rIDManagerReference", "gPLink", "ipsecNFAReference", \
"fRSPrimaryMember", "fSMORoleOwner", "masteredBy", "ipsecOwnersReference", "wellKnownObjects", \
"badPwdCount", "ipsecISAKMPReference", "ipsecFilterReference", "msDs-masteredBy", "lastSetTime", \
"ipsecNegotiationPolicyReference", "subRefs", "gPCFileSysPath", "accountExpires", "dSCorePropagationData", \
# After Exchange preps
"targetAddress", "msExchMailboxGuid", "siteFolderGUID"]
#self.ignore_attributes = []
self.ignore_attributes = [x.upper() for x in self.ignore_attributes]
# Attributes that contain the unique DN tail part e.g. 'DC=samba,DC=org'
self.dn_attributes = ["distinguishedName", "defaultObjectCategory", \
"member", "memberOf", "siteList", "nCName", "homeMDB", "homeMTA", "interSiteTopologyGenerator", \
# After Exchange preps
"msExchHomeRoutingGroup", "msExchResponsibleMTAServer", "siteFolderServer", "msExchRoutingMasterDN", \
"msExchRoutingGroupMembersBL", "homeMDBBL", "msExchHomePublicMDB", "msExchOwningServer", "templateRoots", \
"addressBookRoots", "msExchPolicyRoots", "globalAddressList", "msExchOwningPFTree", \
"msExchResponsibleMTAServerBL", "msExchOwningPFTreeBL",]
self.dn_attributes = [x.upper() for x in self.dn_attributes]
# Attributes that contain the Domain name e.g. 'samba.org'
self.domain_attributes = ["proxyAddresses", "mail", "userPrincipalName", "msExchSmtpFullyQualifiedDomainName", \
"dnsHostName", "networkAddress", "dnsRoot", "servicePrincipalName"]
self.domain_attributes = [x.upper() for x in self.domain_attributes]
def fix_dn(self, s):
res = "%s" % s
if res.upper().endswith(self.con.base_dn.upper()):
res = res[:len(res)-len(self.con.base_dn)] + "${DOMAIN_DN}"
return res
def fix_domain_name(self, s):
res = "%s" % s
if res.upper().endswith(self.con.domain_name.upper()):
res = res[:len(res)-len(self.con.domain_name)] + "${DOMAIN_NAME}"
return res
def fix_netbios_name(self, s):
res = "%s" % s
if res.upper().endswith(self.con.netbios_name.upper()):
res = res[:len(res)-len(self.con.netbios_name)] + "${NETBIOS_NAME}"
return res
def __eq__(self, other):
res = True
self.unique_attrs = []
self.df_value_attrs = []
other.unique_attrs = []
if self.attributes.keys() != other.attributes.keys():
print 4*" " + "Different number of attributes!"
title = 4*" " + "Attributes found only in %s:" % self.con.base_dn
for x in self.attributes.keys():
if not x.upper() in [q.upper() for q in other.attributes.keys()]:
if title:
print title
title = None
print 8*" " + x
title = 4*" " + "Attributes found only in %s:" % other.con.base_dn
for x in other.attributes.keys():
if not x.upper() in [q.upper() for q in self.attributes.keys()]:
if title:
print title
title = None
print 8*" " + x
res = False
missing_attrs = [x.upper() for x in self.unique_attrs]
missing_attrs += [x.upper() for x in other.unique_attrs]
title = 4*" " + "Difference in attribute values:"
for x in self.attributes.keys():
if x.upper() in self.ignore_attributes or x.upper() in missing_attrs:
if isinstance(self.attributes[x], list) and isinstance(other.attributes[x], list):
self.attributes[x] = sorted(self.attributes[x])
other.attributes[x] = sorted(other.attributes[x])
if self.attributes[x] != other.attributes[x]:
p = None
q = None
# Attribute values that are list that contain DN based values that may differ
if x.upper() in self.dn_attributes:
p = [self.fix_dn(j) for j in self.attributes[x]]
q = [other.fix_dn(j) for j in other.attributes[x]]
if p == q:
elif x.upper() in ["DC",]:
# Usually displayed as the first part of the Domain DN
p = [self.con.domain_name.split(".")[0] == j for j in self.attributes[x]]
q = [other.con.domain_name.split(".")[0] == j for j in other.attributes[x]]
if p == q:
# Attributes that contain the Domain name in them
elif x.upper() in self.domain_attributes:
p = [self.fix_domain_name(j) for j in self.attributes[x]]
q = [other.fix_domain_name(j) for j in other.attributes[x]]
if p == q:
if title:
print title
title = None
if p and q:
print 8*" " + x + " -> \n* %s\n* %s" % (p, q)
print 8*" " + x + " -> \n* %s\n* %s" % (self.attributes[x], other.attributes[x])
res = False
if self.unique_attrs + other.unique_attrs != []:
assert self.unique_attrs != other.unique_attrs
self.summary["unique_attrs"] += self.unique_attrs
self.summary["df_value_attrs"] += self.df_value_attrs
other.summary["unique_attrs"] += other.unique_attrs
other.summary["df_value_attrs"] += self.df_value_attrs # they are the same
return res
class AdBundel(object):
def __init__(self, con, context=None, dn_list=None):
self.con = con
self.summary = {}
self.summary["unique_attrs"] = []
self.summary["df_value_attrs"] = []
self.summary["known_ignored_dn"] = []
self.summary["abnormal_ignored_dn"] = []
if dn_list:
self.dn_list = dn_list
elif context.upper() in ["DOMAIN", "CONFIGURATION", "SCHEMA"]:
self.context = context.upper()
self.dn_list = self.get_dn_list(context)
raise Exception("Unknown initialization data for AdBundel().")
self.dn_list = [x[:len(x)-len(self.con.base_dn)] + "${DOMAIN_DN}" for x in self.dn_list]
self.dn_list = list(set(self.dn_list))
self.dn_list = sorted(self.dn_list)
self.size = len(self.dn_list)
def update_size(self):
self.size = len(self.dn_list)
self.dn_list = sorted(self.dn_list)
def __eq__(self, other):
res = True
if self.size != other.size:
print "Lists have different size: %s != %s" % (self.size, other.size)
res = False
print "\n* DNs found only in %s:" % self.con.base_dn
for x in self.dn_list:
if not x.upper() in [q.upper() for q in other.dn_list]:
print " %s" % x
self.dn_list[self.dn_list.index(x)] = ""
self.dn_list = [x for x in self.dn_list if x]
print "\n* DNs found only in %s:" % other.con.base_dn
for x in other.dn_list:
if not x.upper() in [q.upper() for q in self.dn_list]:
print " %s" % x
other.dn_list[other.dn_list.index(x)] = ""
other.dn_list = [x for x in other.dn_list if x]
print "%s == %s" % (self.size, other.size)
assert self.size == other.size
assert sorted([x.upper() for x in self.dn_list]) == sorted([x.upper() for x in other.dn_list])
index = 0
while index < self.size:
skip = False
object1 = AdObject(self.con, self.dn_list[index], self.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
print "\n!!! Object not found:", self.dn_list[index]
skip = True
object2 = AdObject(other.con, other.dn_list[index], other.summary)
except LdbError, (ERR_NO_SUCH_OBJECT, _):
print "\n!!! Object not found:", other.dn_list[index]
skip = True
if skip:
index += 1
print "\nComparing:\n'%s'\n'%s'" % (object1.dn, object2.dn)
if object1 == object2:
print 4*" " + "OK"
print 4*" " + "FAILED"
res = False
self.summary = object1.summary
other.summary = object2.summary
index += 1
return res
def is_ignored(self, dn):
ignore_list = {
"DOMAIN" : [
# Default naming context
"^CN=BCKUPKEY_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12} Secret,CN=System,",
"^CN=BCKUPKEY_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12} Secret,CN=System,",
"^CN=Domain System Volume (SYSVOL share),CN=NTFRS Subscriptions,CN=.+?,OU=Domain Controllers,",
"^CN=NTFRS Subscriptions,CN=.+?,OU=Domain Controllers,",
"^CN=RID Set,CN=.+?,OU=Domain Controllers,",
"^CN=.+?,CN=Domain System Volume \(SYSVOL share\),CN=File Replication Service,CN=System,",
"^CN=.+?,OU=Domain Controllers,",
# After Exchange preps
"^CN=OWAScratchPad.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.,CN=Microsoft Exchange System Objects,",
"^CN=StoreEvents.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.,CN=Microsoft ExchangeSystem Objects,",
"^CN=SystemMailbox.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.,CN=Microsoft Exchange System Objects,",
# Configuration naming context
"^CN=NTDS Settings,CN=.+?,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,",
"^CN=%s,CN=Partitions,CN=Configuration," % self.con.netbios_name,
# This one has to be investigated
"^CN=Default Query Policy,CN=Query-Policies,CN=Directory Service,CN=WindowsNT,CN=Services,CN=Configuration,",
# After Exchange preps
"^CN=SMTP \(.+?-\{[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\}\),CN=Connections,CN=First Organization,CN=Microsoft Exchange,CN=Services,CN=Configuration,", # x 3 times
"SCHEMA" : [
#ignore_list = {}
for x in ignore_list[self.context]:
if re.match(x.upper(), dn.upper()):
return True
return False
def get_dn_list(self, context):
""" Query LDAP server about the DNs of certain naming self.con.ext Domain (or Default), Configuration, Schema.
Parse all DNs and filter those that are 'strange' or abnormal.
if context.upper() == "DOMAIN":
search_base = "%s" % self.con.base_dn
elif context.upper() == "CONFIGURATION":
search_base = "CN=Configuration,%s" % self.con.base_dn
elif context.upper() == "SCHEMA":
search_base = "CN=Schema,CN=Configuration,%s" % self.con.base_dn
dn_list = []
res = self.con.ldb.search(base=search_base, scope=SCOPE_SUBTREE, attrs=["dn"])
for x in res:
global summary
print "\nIgnored (strange) DNs in %s:" % self.con.base_dn
for x in dn_list:
xx = "".join(re.findall("[Cc][Nn]=.*?,", x)) \
+ "".join(re.findall("[Oo][Uu]=.*?,", x)) \
+ "".join(re.findall("[Dd][Cc]=.*?,", x)) + re.search("([Dd][Cc]=[\w^=]*?$)", x).group()
if x != xx:
print 4*" " + x
dn_list[dn_list.index(x)] = ""
print "\nKnown DN ignore list for %s" % self.con.base_dn
for x in dn_list:
if self.is_ignored(x):
print 4*" " + x
dn_list[dn_list.index(x)] = ""
dn_list = [x for x in dn_list if x]
return dn_list
def print_summary(self):
self.summary["unique_attrs"] = list(set(self.summary["unique_attrs"]))
self.summary["df_value_attrs"] = list(set(self.summary["df_value_attrs"]))
print "\nAttributes found only in %s:" % self.con.base_dn
print "".join([str("\n" + 4*" " + x) for x in self.summary["unique_attrs"]])
print "\nAttributes with different values:"
print "".join([str("\n" + 4*" " + x) for x in self.summary["df_value_attrs"]])
self.summary["df_value_attrs"] = []
if __name__ == "__main__":
parser = OptionParser("ldapcmp [options] domain|configuration|schema")
sambaopts = options.SambaOptions(parser)
credopts = options.CredentialsOptionsDouble(parser)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
creds2 = credopts.get_credentials2(lp)
parser.add_option("", "--host", dest="host",
help="IP of the first LDAP server",)
parser.add_option("", "--host2", dest="host2",
help="IP of the second LDAP server",)
(options, args) = parser.parse_args()
if not (len(args) == 1 and args[0].upper() in ["DOMAIN", "CONFIGURATION", "SCHEMA"]):
parser.error("Incorrect arguments")
con1 = LDAPBase(options.host, creds, lp)
assert len(con1.base_dn) > 0
con2 = LDAPBase(options.host2, creds2, lp)
assert len(con2.base_dn) > 0
b1 = AdBundel(con1, args[0])
b2 = AdBundel(con2, args[0])
if b1 == b2:
print "\n\nFinal result: SUCCESS!"
status = 0
print "\n\nFinal result: FAILURE!"
status = 1
assert len(b1.summary["df_value_attrs"]) == len(b2.summary["df_value_attrs"])
print "\nSUMMARY"
print "---------"

View File

@ -114,3 +114,55 @@ class CredentialsOptions(optparse.OptionGroup):
if not self.no_pass:
return self.creds
class CredentialsOptionsDouble(CredentialsOptions):
"""Command line options for specifying credentials of two servers."""
def __init__(self, parser):
CredentialsOptions.__init__(self, parser)
self.no_pass2 = False
self.add_option("--simple-bind-dn2", metavar="DN2", action="callback",
callback=self._set_simple_bind_dn2, type=str,
help="DN to use for a simple bind")
self.add_option("--password2", metavar="PASSWORD2", action="callback",
help="Password", type=str, callback=self._set_password2)
self.add_option("--username2", metavar="USERNAME2",
action="callback", type=str,
help="Username for second server", callback=self._parse_username2)
self.add_option("--workgroup2", metavar="WORKGROUP2",
action="callback", type=str,
help="Workgroup for second server", callback=self._parse_workgroup2)
self.add_option("--no-pass2", action="store_true",
help="Don't ask for a password for the second server")
self.add_option("--kerberos2", metavar="KERBEROS2",
action="callback", type=str,
help="Use Kerberos", callback=self._set_kerberos2)
self.creds2 = Credentials()
def _parse_username2(self, option, opt_str, arg, parser):
def _parse_workgroup2(self, option, opt_str, arg, parser):
def _set_password2(self, option, opt_str, arg, parser):
def _set_kerberos2(self, option, opt_str, arg, parser):
if bool(arg) or arg.lower() == "yes":
def _set_simple_bind_dn2(self, option, opt_str, arg, parser):
def get_credentials2(self, lp):
"""Obtain the credentials set on the command-line.
:param lp: Loadparm object to use.
:return: Credentials object
if not self.no_pass2:
return self.creds2